BIND 10 trac598, updated. b52e19e101e1a046e983d3fd3146aa49487a93cd [trac598] Add comments and refactor the code of IOFetch and ForwardQuery

BIND 10 source code commits bind10-changes at lists.isc.org
Fri Mar 18 09:48:38 UTC 2011


The branch, trac598 has been updated
       via  b52e19e101e1a046e983d3fd3146aa49487a93cd (commit)
       via  ab30677909624dac9efba8b8b8fe7e52db503875 (commit)
      from  306da065b7635b9d29fbafee45b1c77a8e43f05c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit b52e19e101e1a046e983d3fd3146aa49487a93cd
Author: zhanglikun <zhanglikun at cnnic.cn>
Date:   Fri Mar 18 17:35:38 2011 +0800

    [trac598] Add comments and refactor the code of IOFetch and ForwardQuery

commit ab30677909624dac9efba8b8b8fe7e52db503875
Author: zhanglikun <zhanglikun at cnnic.cn>
Date:   Fri Mar 18 14:30:25 2011 +0800

    [trac598] Add class ForwardQuery for forward query mode and remove forward query logic from class RunningQuery

-----------------------------------------------------------------------

Summary of changes:
 src/lib/asiolink/io_fetch.cc                       |   44 ++--
 src/lib/asiolink/io_fetch.h                        |   20 ++-
 src/lib/asiolink/recursive_query.cc                |  275 +++++++++++++++-----
 src/lib/asiolink/recursive_query.h                 |   12 +-
 src/lib/asiolink/tests/recursive_query_unittest.cc |   21 ++-
 5 files changed, 271 insertions(+), 101 deletions(-)

-----------------------------------------------------------------------
diff --git a/src/lib/asiolink/io_fetch.cc b/src/lib/asiolink/io_fetch.cc
index 222ccea..285a464 100644
--- a/src/lib/asiolink/io_fetch.cc
+++ b/src/lib/asiolink/io_fetch.cc
@@ -56,60 +56,60 @@ namespace {
         return (data);
     }
 
-    void copyMessageFlag(ConstMessagePtr src, MessagePtr dst,
-                         const Message::HeaderFlag flag) {
-        dst->setHeaderFlag(flag, src->getHeaderFlag(flag));
-    }
 }
 
 namespace asiolink {
 /// IOFetch Constructor - just initialize the private data
 
-IOFetch::IOFetch(int protocol, IOService& service,
-    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
-    isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
+void
+IOFetch::initIOFetch(MessagePtr& query_msg, int protocol, IOService& service,
+                     const isc::dns::Question& question, const IOAddress& address,
+                     uint16_t port, isc::dns::OutputBufferPtr& buff, 
+                     Callback* cb, int wait)
 {
-    MessagePtr query_msg(new Message(Message::RENDER));
-    
     query_msg->setQid(QidGenerator::getInstance().generateQid());
     query_msg->setOpcode(Opcode::QUERY());
     query_msg->setRcode(Rcode::NOERROR());
     query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
     query_msg->addQuestion(question);
-
     data_ = boost::shared_ptr<IOFetchData>(new IOFetch::IOFetchData(
         protocol, service, query_msg, address,
         port, buff, cb, wait));
 }
 
 IOFetch::IOFetch(int protocol, IOService& service,
+    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
+    isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
+{
+    MessagePtr query_msg(new Message(Message::RENDER));
+    initIOFetch(query_msg, protocol, service, question, address, port, buff,
+                cb, wait);
+}
+
+IOFetch::IOFetch(int protocol, IOService& service,
     ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
     isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
 {
     MessagePtr msg(new Message(Message::RENDER));
+    initIOFetch(msg, protocol, service,
+                **(query_message->beginQuestion()),
+                address, port, buff,
+                cb, wait);
 
-    msg->setQid(QidGenerator::getInstance().generateQid());
-    msg->setOpcode(Opcode::QUERY());
-    msg->setRcode(Rcode::NOERROR());
-    copyMessageFlag(query_message, msg, Message::HEADERFLAG_RD);
-    copyMessageFlag(query_message, msg, Message::HEADERFLAG_CD);
+    Message::HeaderFlag flag = Message::HEADERFLAG_RD;
+    msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));
+    flag = Message::HEADERFLAG_CD;
+    msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));
 
     ConstEDNSPtr edns(query_message->getEDNS());
     const bool dnssec_ok = edns && edns->getDNSSECAwareness();
     if (edns) {
         EDNSPtr edns_response(new EDNS());
         edns_response->setDNSSECAwareness(dnssec_ok);
-
         // TODO: We should make our own edns bufsize length configurable
         edns_response->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
         msg->setEDNS(edns_response);
     }
-
-    msg->addQuestion(*(*query_message->beginQuestion()));
-
-    data_ = boost::shared_ptr<IOFetchData>(new IOFetch::IOFetchData(
-        protocol, service, msg, address,
-        port, buff, cb, wait));
 }
 
 /// The function operator is implemented with the "stackless coroutine"
diff --git a/src/lib/asiolink/io_fetch.h b/src/lib/asiolink/io_fetch.h
index db84ef1..6fdce6c 100644
--- a/src/lib/asiolink/io_fetch.h
+++ b/src/lib/asiolink/io_fetch.h
@@ -199,12 +199,19 @@ public:
         const isc::dns::Question& question, const IOAddress& address,
         uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb,
         int wait = -1);
- 
+    
+    /// \brief Constructor
+    ///  This constructor has one parameter "query_message", which
+    ///  is the shared_ptr to a full query message. It's different
+    ///  with above contructor which has only question section. All
+    ///  other parameters are same.
+    ///
+    /// \param query_message the shared_ptr to a full query message.
     IOFetch(int protocol, IOService& service,
         isc::dns::ConstMessagePtr query_message, const IOAddress& address,
         uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb,
         int wait = -1);
-    
+
     /// \brief Coroutine entry point
     ///
     /// The operator() method is the method in which the coroutine code enters
@@ -224,6 +231,15 @@ public:
     void stop(Result reason = STOPPED);
 
 private:
+    /// \brief IOFetch Initialization Function.
+    /// All the parameters are same with the constructor, except
+    /// parameter "query_message"
+    /// \param query_message the message to be sent out.
+    void initIOFetch(isc::dns::MessagePtr& query_message, int protocol,
+            IOService& service, const isc::dns::Question& question,
+            const IOAddress& address, uint16_t port,
+            isc::dns::OutputBufferPtr& buff, Callback* cb, int wait);
+
     boost::shared_ptr<IOFetchData>  data_;   ///< Private data
 };
 
diff --git a/src/lib/asiolink/recursive_query.cc b/src/lib/asiolink/recursive_query.cc
index 678f778..9a069b3 100644
--- a/src/lib/asiolink/recursive_query.cc
+++ b/src/lib/asiolink/recursive_query.cc
@@ -84,10 +84,6 @@ private:
     // This is where we build and store our final answer
     MessagePtr answer_message_;
 
-    // currently we use upstream as the current list of NS records
-    // we should differentiate between forwarding and resolving
-    boost::shared_ptr<AddressVector> upstream_;
-
     // root servers...just copied over to the zone_servers_
     boost::shared_ptr<AddressVector> upstream_root_;
 
@@ -142,34 +138,9 @@ private:
 
     // (re)send the query to the server.
     void send() {
-        const int uc = upstream_->size();
         const int zs = zone_servers_.size();
         buffer_->clear();
-        if (uc > 0) {
-            int serverIndex = rand() % uc;
-            dlog("Sending upstream query (" + question_.toText() +
-                ") to " + upstream_->at(serverIndex).first);
-            // Forward the query, create the IOFetch with
-            // query message, so that query flags can be forwarded
-            // together.
-            IOFetch* query = NULL;
-            if (query_message_) {
-                query = new IOFetch(IPPROTO_UDP, io_, query_message_,
-                                    upstream_->at(serverIndex).first,
-                                    upstream_->at(serverIndex).second,
-                                    buffer_, this, query_timeout_);
- 
-            } else {
-                query = new IOFetch(IPPROTO_UDP, io_, question_,
-                                    upstream_->at(serverIndex).first,
-                                    upstream_->at(serverIndex).second,
-                                    buffer_, this, query_timeout_);
-            }
-            ++queries_out_;
-            std::cout << "==========================================post qury" << std::endl;
-            io_.get_io_service().post(*query);
-            delete query;
-        } else if (zs > 0) {
+        if (zs > 0) {
             int serverIndex = rand() % zs;
             dlog("Sending query to zone server (" + question_.toText() +
                 ") to " + zone_servers_.at(serverIndex).first);
@@ -183,7 +154,7 @@ private:
             dlog("Error, no upstream servers to send to.");
         }
     }
-    
+
     // This function is called by operator() if there is an actual
     // answer from a server and we are in recursive mode
     // depending on the contents, we go on recursing or return
@@ -329,7 +300,6 @@ public:
         question_(question),
         query_message_(),
         answer_message_(answer_message),
-        upstream_(upstream),
         upstream_root_(upstream_root),
         buffer_(buffer),
         resolvercallback_(cb),
@@ -357,15 +327,8 @@ public:
             client_timer.async_wait(boost::bind(&RunningQuery::clientTimeout, this));
         }
         
-        // should use NSAS for root servers
-        // Adding root servers if not a forwarder
-        if (upstream_->empty()) {
-            setZoneServersToRoot();
-        }
-    }
-
-    void setQueryMessage(ConstMessagePtr query_message) {
-        query_message_ = query_message;
+        setZoneServersToRoot();
+        doLookup();
     }
 
     void setZoneServersToRoot() {
@@ -420,7 +383,6 @@ public:
         // here again.
         // same goes if we have an outstanding query (can't delete
         // until that one comes back to us)
-        std::cout << "stop=====================================" << std::endl;
         done_ = true;
         if (resume && !answer_sent_) {
             answer_sent_ = true;
@@ -470,8 +432,7 @@ public:
             InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
             incoming.fromWire(ibuf);
 
-            if (upstream_->size() == 0 &&
-                incoming.getRcode() == Rcode::NOERROR()) {
+            if (incoming.getRcode() == Rcode::NOERROR()) {
                 done_ = handleRecursiveAnswer(incoming);
             } else {
                 isc::resolve::copyResponseMessage(incoming, answer_message_);
@@ -492,6 +453,197 @@ public:
     }
 };
 
+class ForwardQuery : public IOFetch::Callback {
+private:
+    // The io service to handle async calls
+    IOService& io_;
+
+    // This is the query message got from client
+    ConstMessagePtr query_message_;
+
+    // This is where we build and store our final answer
+    MessagePtr answer_message_;
+
+    // currently we use upstream as the current list of NS records
+    // we should differentiate between forwarding and resolving
+    boost::shared_ptr<AddressVector> upstream_;
+
+    // Buffer to store the result.
+    OutputBufferPtr buffer_;
+
+    // Server to notify when we succeed or fail
+    //shared_ptr<DNSServer> server_;
+    isc::resolve::ResolverInterface::CallbackPtr resolvercallback_;
+
+    /*
+     * TODO Do something more clever with timeouts. In the long term, some
+     *     computation of average RTT, increase with each retry, etc.
+     */
+    // Timeout information
+    int query_timeout_;
+    unsigned retries_;
+
+    // normal query state
+
+    // TODO: replace by our wrapper
+    asio::deadline_timer client_timer;
+    asio::deadline_timer lookup_timer;
+
+    size_t queries_out_;
+    
+    // If we timed out ourselves (lookup timeout), stop issuing queries
+    bool done_;
+
+    // If we have a client timeout, we send back an answer, but don't
+    // stop. We use this variable to make sure we don't send another
+    // answer if we do find one later (or if we have a lookup_timeout)
+    bool answer_sent_;
+
+    // Reference to our cache
+    isc::cache::ResolverCache& cache_;
+
+    // (re)send the query to the server.
+    void send() {
+        const int uc = upstream_->size();
+        buffer_->clear();
+        int serverIndex = rand() % uc;
+        ConstQuestionPtr question = *(query_message_->beginQuestion());
+        dlog("Sending upstream query (" + question->toText() +
+             ") to " + upstream_->at(serverIndex).first);
+        // Forward the query, create the IOFetch with
+        // query message, so that query flags can be forwarded
+        // together.
+        IOFetch query(IPPROTO_UDP, io_, query_message_,
+            upstream_->at(serverIndex).first,
+            upstream_->at(serverIndex).second,
+            buffer_, this, query_timeout_);
+
+        ++queries_out_;
+        io_.get_io_service().post(query);
+    }
+
+public:
+    ForwardQuery(IOService& io,
+        ConstMessagePtr query_message,
+        MessagePtr answer_message,
+        boost::shared_ptr<AddressVector> upstream,
+        OutputBufferPtr buffer,
+        isc::resolve::ResolverInterface::CallbackPtr cb,
+        int query_timeout, int client_timeout, int lookup_timeout,
+        unsigned retries,
+        isc::cache::ResolverCache& cache) :
+        io_(io),
+        query_message_(query_message),
+        answer_message_(answer_message),
+        upstream_(upstream),
+        buffer_(buffer),
+        resolvercallback_(cb),
+        query_timeout_(query_timeout),
+        retries_(retries),
+        client_timer(io.get_io_service()),
+        lookup_timer(io.get_io_service()),
+        queries_out_(0),
+        done_(false),
+        answer_sent_(false),
+        cache_(cache)
+    {
+        // Setup the timer to stop trying (lookup_timeout)
+        if (lookup_timeout >= 0) {
+            lookup_timer.expires_from_now(
+                boost::posix_time::milliseconds(lookup_timeout));
+            lookup_timer.async_wait(boost::bind(&ForwardQuery::stop, this, false));
+        }
+
+        // Setup the timer to send an answer (client_timeout)
+        if (client_timeout >= 0) {
+            client_timer.expires_from_now(
+                boost::posix_time::milliseconds(client_timeout));
+            client_timer.async_wait(boost::bind(&ForwardQuery::clientTimeout, this));
+        }
+
+        send();
+    }
+
+    virtual void clientTimeout() {
+        // Return a SERVFAIL, but do not stop until
+        // we have an answer or timeout ourselves
+        isc::resolve::makeErrorMessage(answer_message_,
+                                       Rcode::SERVFAIL());
+        if (!answer_sent_) {
+            answer_sent_ = true;
+            resolvercallback_->success(answer_message_);
+        }
+    }
+
+    virtual void stop(bool resume) {
+        // if we cancel our timers, we will still get an event for
+        // that, so we cannot delete ourselves just yet (those events
+        // would be bound to a deleted object)
+        // cancel them one by one, both cancels should get us back
+        // here again.
+        // same goes if we have an outstanding query (can't delete
+        // until that one comes back to us)
+        done_ = true;
+        if (resume && !answer_sent_) {
+            answer_sent_ = true;
+
+            // There are two types of messages we could store in the
+            // cache;
+            // 1. answers to our fetches from authoritative servers,
+            //    exactly as we receive them, and
+            // 2. answers to queries we received from clients, which
+            //    have received additional processing (following CNAME
+            //    chains, for instance)
+            //
+            // Doing only the first would mean we would have to re-do
+            // processing when we get data from our cache, and doing
+            // only the second would miss out on the side-effect of
+            // having nameserver data in our cache.
+            //
+            // So right now we do both. Since the cache (currently)
+            // stores Messages on their question section only, this
+            // does mean that we overwrite the messages we stored in
+            // the previous iteration if we are following a delegation.
+            cache_.update(*answer_message_);
+
+            resolvercallback_->success(answer_message_);
+        } else {
+            resolvercallback_->failure();
+        }
+        if (lookup_timer.cancel() != 0) {
+            return;
+        }
+        if (client_timer.cancel() != 0) {
+            return;
+        }
+        if (queries_out_ > 0) {
+            return;
+        }
+        delete this;
+    }
+
+    // This function is used as callback from DNSQuery.
+    virtual void operator()(IOFetch::Result result) {
+        // XXX is this the place for TCP retry?
+        --queries_out_;
+        if (!done_ && result != IOFetch::TIME_OUT) {
+            // we got an answer
+            Message incoming(Message::PARSE);
+            InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
+            incoming.fromWire(ibuf);
+            isc::resolve::copyResponseMessage(incoming, answer_message_);
+            done_ = true;
+            stop(true);
+        } else if (!done_ && retries_--) {
+            // We timed out, but we have some retries, so send again
+            dlog("Timeout, resending query");
+            send();
+        } else {
+            // out of retries, give up for now
+            stop(false);
+        }
+    }
+};
 }
 
 void
@@ -516,12 +668,11 @@ RecursiveQuery::resolve(const QuestionPtr& question,
     } else {
         dlog("Message not found in cache, starting recursive query");
         // It will delete itself when it is done
-        RunningQuery* query = new RunningQuery(io, *question, answer_message,
-                                               upstream_, upstream_root_,
-                                               buffer, callback, query_timeout_,
-                                               client_timeout_, lookup_timeout_,
-                                               retries_, cache_);
-        query->doLookup();
+        new RunningQuery(io, *question, answer_message,
+                         upstream_, upstream_root_,
+                         buffer, callback, query_timeout_,
+                         client_timeout_, lookup_timeout_,
+                         retries_, cache_);
     }
 }
 
@@ -555,12 +706,11 @@ RecursiveQuery::resolve(const Question& question,
     } else {
         dlog("Message not found in cache, starting recursive query");
         // It will delete itself when it is done
-        RunningQuery* query = new RunningQuery(io, question, answer_message,
-                                               upstream_, upstream_root_,
-                                               buffer, crs, query_timeout_,
-                                               client_timeout_, lookup_timeout_,
-                                               retries_, cache_);
-        query->doLookup();
+        new RunningQuery(io, question, answer_message,
+                         upstream_, upstream_root_,
+                         buffer, crs, query_timeout_,
+                         client_timeout_, lookup_timeout_,
+                         retries_, cache_);
     }
 }
 
@@ -593,15 +743,12 @@ RecursiveQuery::forward(ConstMessagePtr query_message,
         answer_message->setRcode(Rcode::NOERROR());
         crs->success(answer_message);
     } else {
-        dlog("Message not found in cache, starting recursive query");
+        dlog("Message not found in cache, starting forward query");
         // It will delete itself when it is done
-        RunningQuery* query = new RunningQuery(io, *question, answer_message,
-                                               upstream_, upstream_root_,
-                                               buffer, crs, query_timeout_,
-                                               client_timeout_, lookup_timeout_,
-                                               retries_, cache_);
-        query->setQueryMessage(query_message);
-        query->doLookup();
+        new ForwardQuery(io, query_message, answer_message,
+                         upstream_, buffer, crs, query_timeout_,
+                         client_timeout_, lookup_timeout_,
+                         retries_, cache_);
     }
 }
 
diff --git a/src/lib/asiolink/recursive_query.h b/src/lib/asiolink/recursive_query.h
index 09f7c16..e3ac56c 100644
--- a/src/lib/asiolink/recursive_query.h
+++ b/src/lib/asiolink/recursive_query.h
@@ -99,16 +99,12 @@ public:
                  isc::dns::OutputBufferPtr buffer,
                  DNSServer* server);
 
-    /// \brief Initiates resolving for the given question.
+    /// \brief Initiates forwarding for the given query.
     ///
-    /// This actually calls the previous sendQuery() with a default
-    /// callback object, which calls resume() on the given DNSServer
-    /// object.
+    ///  Others parameters are same with the parameters of
+    ///  function resolve().
     ///
-    /// \param question The question being answered <qname/qclass/qtype>
-    /// \param answer_message An output Message into which the final response will be copied
-    /// \param buffer An output buffer into which the intermediate responses will be copied
-    /// \param server A pointer to the \c DNSServer object handling the client
+    /// \param query_message the full query got from client.
     void forward(isc::dns::ConstMessagePtr query_message,
                  isc::dns::MessagePtr answer_message,
                  isc::dns::OutputBufferPtr buffer,
diff --git a/src/lib/asiolink/tests/recursive_query_unittest.cc b/src/lib/asiolink/tests/recursive_query_unittest.cc
index 33cbbc6..9760db6 100644
--- a/src/lib/asiolink/tests/recursive_query_unittest.cc
+++ b/src/lib/asiolink/tests/recursive_query_unittest.cc
@@ -48,6 +48,7 @@
 #include <asiolink/io_error.h>
 #include <asiolink/dns_lookup.h>
 #include <asiolink/simple_callback.h>
+#include <resolve/resolve.h>
 
 using isc::UnitTestUtil;
 using namespace std;
@@ -549,9 +550,12 @@ TEST_F(RecursiveQueryTest, forwarderSend) {
                       singleAddress(TEST_IPV4_ADDR, port));
 
     Question q(Name("example.com"), RRClass::IN(), RRType::TXT());
+    Message query_message(Message::RENDER);
+    isc::resolve::initResponseMessage(q, query_message);
+
     OutputBufferPtr buffer(new OutputBuffer(0));
     MessagePtr answer(new Message(Message::RENDER));
-    rq.resolve(q, answer, buffer, &server);
+    rq.forward(ConstMessagePtr(&query_message), answer, buffer, &server);
 
     char data[4096];
     size_t size = sizeof(data);
@@ -607,7 +611,6 @@ bool tryRead(int sock_, int recv_options, size_t max, int* num) {
     do {
         char inbuff[512];
         if (recv(sock_, inbuff, sizeof(inbuff), recv_options) < 0) {
-            std::cout << "recv < 0===============================\n";
             return false;
         } else {
             ++i;
@@ -639,8 +642,10 @@ TEST_F(RecursiveQueryTest, forwardQueryTimeout) {
     Question question(Name("example.net"), RRClass::IN(), RRType::A());
     OutputBufferPtr buffer(new OutputBuffer(0));
     MessagePtr answer(new Message(Message::RENDER));
-    query.resolve(question, answer, buffer, &server);
 
+    Message query_message(Message::RENDER);
+    isc::resolve::initResponseMessage(question, query_message);
+    query.forward(ConstMessagePtr(&query_message), answer, buffer, &server);
     // Run the test
     io_service_->run();
 
@@ -683,7 +688,10 @@ TEST_F(RecursiveQueryTest, forwardClientTimeout) {
                          200, 480, 4000, 4);
     Question question(Name("example.net"), RRClass::IN(), RRType::A());
     OutputBufferPtr buffer(new OutputBuffer(0));
-    query.resolve(question, answer, buffer, &server);
+
+    Message query_message(Message::RENDER);
+    isc::resolve::initResponseMessage(question, query_message);
+    query.forward(ConstMessagePtr(&query_message), answer, buffer, &server);
 
     // Run the test
     io_service_->run();
@@ -727,7 +735,10 @@ TEST_F(RecursiveQueryTest, forwardLookupTimeout) {
                          200, 4000, 480, 5);
     Question question(Name("example.net"), RRClass::IN(), RRType::A());
     OutputBufferPtr buffer(new OutputBuffer(0));
-    query.resolve(question, answer, buffer, &server);
+
+    Message query_message(Message::RENDER);
+    isc::resolve::initResponseMessage(question, query_message);
+    query.forward(ConstMessagePtr(&query_message), answer, buffer, &server);
 
     // Run the test
     io_service_->run();




More information about the bind10-changes mailing list