BIND 10 trac2871, updated. 4a6b06c9e3faf38030823f2b213d0731f5526749 [2873] Make the queries slimmer

BIND 10 source code commits bind10-changes at lists.isc.org
Wed Jul 17 11:54:23 UTC 2013


The branch, trac2871 has been updated
       via  4a6b06c9e3faf38030823f2b213d0731f5526749 (commit)
       via  14538b9e516033b199fdee9ff69526cb6dc4c6cc (commit)
       via  beeb6473819bb9128c4d2d474a2acf41bf5c489d (commit)
       via  3e39a015cddffce0770f0908990f3d38cc748a65 (commit)
      from  bb69cd65951db7b3e4032ede474b81f4e876fbfc (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 4a6b06c9e3faf38030823f2b213d0731f5526749
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Wed Jul 17 13:45:13 2013 +0200

    [2873] Make the queries slimmer
    
    Make the queries more lightweight, but do more of them.

commit 14538b9e516033b199fdee9ff69526cb6dc4c6cc
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Wed Jul 17 13:43:51 2013 +0200

    [2873] Run the landlord in the benchmarks

commit beeb6473819bb9128c4d2d474a2acf41bf5c489d
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Wed Jul 17 13:42:58 2013 +0200

    [2871] The landlord synchronisation implementation

commit 3e39a015cddffce0770f0908990f3d38cc748a65
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Wed Jul 17 13:41:16 2013 +0200

    [2873] Allow to wake the interface main loop
    
    Make it possible to wake up the main loop of the fake interface from
    other thread when it is waiting for events. This is to signal some
    internal event happened that is not seen by the loop itself.

-----------------------------------------------------------------------

Summary of changes:
 src/bin/resolver/bench/fake_resolution.cc |   68 ++++++++--
 src/bin/resolver/bench/fake_resolution.h  |   15 +++
 src/bin/resolver/bench/landlord.cc        |  206 ++++++++++++++++++++++++++++-
 src/bin/resolver/bench/landlord.h         |   25 +++-
 src/bin/resolver/bench/main.cc            |   18 ++-
 5 files changed, 313 insertions(+), 19 deletions(-)

-----------------------------------------------------------------------
diff --git a/src/bin/resolver/bench/fake_resolution.cc b/src/bin/resolver/bench/fake_resolution.cc
index e3d54a9..bde53aa 100644
--- a/src/bin/resolver/bench/fake_resolution.cc
+++ b/src/bin/resolver/bench/fake_resolution.cc
@@ -21,6 +21,9 @@
 #include <boost/foreach.hpp>
 #include <algorithm>
 #include <stdlib.h> // not cstdlib, which doesn't officially have random()
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
 
 namespace isc {
 namespace resolver {
@@ -28,11 +31,11 @@ namespace bench {
 
 // Parameters of the generated queries.
 // How much work is each operation?
-const size_t parse_size = 100000;
-const size_t render_size = 100000;
-const size_t send_size = 1000;
-const size_t cache_read_size = 10000;
-const size_t cache_write_size = 10000;
+const size_t parse_size = 10000;
+const size_t render_size = 10000;
+const size_t send_size = 100;
+const size_t cache_read_size = 1000;
+const size_t cache_write_size = 1000;
 // How large a change is to terminate in this iteration (either by getting
 // the complete answer, or by finding it in the cache). With 0.5, half the
 // queries are found in the cache directly. Half of the rest needs just one
@@ -40,8 +43,8 @@ const size_t cache_write_size = 10000;
 const float chance_complete = 0.5;
 // Number of milliseconds an upstream query can take. It picks a random number
 // in between.
-const size_t upstream_time_min = 2;
-const size_t upstream_time_max = 50;
+const size_t upstream_time_min = 1;
+const size_t upstream_time_max = 5;
 
 FakeQuery::FakeQuery(FakeInterface& interface) :
     interface_(&interface),
@@ -89,11 +92,22 @@ FakeQuery::performTask(const StepCallback& callback) {
 }
 
 FakeInterface::FakeInterface(size_t query_count) :
-    queries_(query_count)
+    queries_(query_count),
+    // This initialization of the file descriptors is not exactly exception
+    // safe, but this is a benchmark only, so we don't complicate the code.
+    read_pipe_(-1), write_pipe_(-1),
+    wake_socket_(service_, initSockets())
 {
     BOOST_FOREACH(FakeQueryPtr& query, queries_) {
         query = FakeQueryPtr(new FakeQuery(*this));
     }
+    // Call it on empty socket now, to register next async read.
+    readWakeup("");
+}
+
+FakeInterface::~ FakeInterface() {
+    close(read_pipe_);
+    close(write_pipe_);
 }
 
 void
@@ -167,6 +181,44 @@ FakeInterface::scheduleUpstreamAnswer(FakeQuery* query,
     timer->setup(boost::bind(&UpstreamQuery::trigger, q), msec);
 }
 
+int
+FakeInterface::initSockets() {
+    int socks[2];
+    int result = socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
+    assert(result == 0);
+    read_pipe_ = socks[0];
+    write_pipe_ = socks[1];
+    return read_pipe_;
+}
+
+void
+FakeInterface::wakeup() {
+    // We write a small bit of data to the wakeup socket. It'll generate an
+    // event in the mainloop of processEvents.
+    ssize_t result = send(write_pipe_, "w", 1, MSG_DONTWAIT);
+    // No errors, please.
+    // But blocking (full socket) is not considered an error. If it's full,
+    // the other side will wake up anyway, so that's OK.
+    assert(result == 1 || (errno == EAGAIN || errno == EWOULDBLOCK));
+}
+
+void
+FakeInterface::readWakeup(const std::string& error) {
+    assert(error.empty());
+    // Read some amount of data from the socket. May be more than the 1 byte
+    // we already read, batching some wakeups together.
+    const size_t batch_size = 1024;
+    uint8_t buffer[batch_size];
+    ssize_t result = recv(read_pipe_, buffer, batch_size,
+                          MSG_DONTWAIT /* Make sure we don't block even if
+                                          there's nothing (startup, spurious
+                                          select wakeup)*/);
+    assert(result > 0 || (errno == EAGAIN || errno == EWOULDBLOCK));
+    // Schedule next wakeup event
+    wake_socket_.asyncRead(boost::bind(&FakeInterface::readWakeup, this, _1),
+                           &wake_buffer_, 1);
+}
+
 }
 }
 }
diff --git a/src/bin/resolver/bench/fake_resolution.h b/src/bin/resolver/bench/fake_resolution.h
index cf2219c..d505872 100644
--- a/src/bin/resolver/bench/fake_resolution.h
+++ b/src/bin/resolver/bench/fake_resolution.h
@@ -17,6 +17,7 @@
 
 #include <exceptions/exceptions.h>
 #include <asiolink/io_service.h>
+#include <asiolink/local_socket.h>
 
 #include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
@@ -189,6 +190,8 @@ public:
     /// Initiarile the interface and create query_count queries for the
     /// benchmark. They will be handed out one by one with receiveQuery().
     FakeInterface(size_t query_count);
+    /// \brief Destructor
+    ~FakeInterface();
     /// \brief Wait for answers from upstream servers.
     ///
     /// Wait until at least one "answer" comes from the remote server. This
@@ -211,14 +214,26 @@ public:
     /// This returns a NULL pointer when there are no more queries to answer
     /// (the number designated for the benchmark was reached).
     FakeQueryPtr receiveQuery();
+    /// \brief Wake up from other thread.
+    ///
+    /// Make sure that processEvents() terminates now and won't block any more.
+    /// It may or may not produce some callbacks.
+    ///
+    /// This may be called from a different thread.
+    void wakeup();
 private:
     class UpstreamQuery;
     friend class FakeQuery;
     void scheduleUpstreamAnswer(FakeQuery* query,
                                 const FakeQuery::StepCallback& callback,
                                 size_t msec);
+    int initSockets();
+    void readWakeup(const std::string& error);
     asiolink::IOService service_;
     std::vector<FakeQueryPtr> queries_;
+    int read_pipe_, write_pipe_;
+    asiolink::LocalSocket wake_socket_;
+    char wake_buffer_;
 };
 
 }
diff --git a/src/bin/resolver/bench/landlord.cc b/src/bin/resolver/bench/landlord.cc
index 9d5c08c..7659c0d 100644
--- a/src/bin/resolver/bench/landlord.cc
+++ b/src/bin/resolver/bench/landlord.cc
@@ -12,9 +12,14 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#include <resolver/bench/naive_resolver.h>
+#include <resolver/bench/landlord.h>
 
 #include <util/threads/sync.h>
+#include <util/threads/thread.h>
+
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <vector>
 #include <algorithm>
@@ -22,11 +27,14 @@
 using std::vector;
 using isc::util::thread::CondVar;
 using isc::util::thread::Mutex;
+using isc::util::thread::Thread;
 
-namespace {
+namespace isc {
+namespace resolver {
+namespace bench {
 
-template<class T, class Container = std::vector<T> >
-class Queue {
+template<class T, class Container>
+class LandlordResolver::Queue {
 public:
     // Constructor.
     Queue() :
@@ -42,6 +50,9 @@ public:
     }
     // Push multiple values (and wake up if something is waiting on pop()).
     void push(const Values& values) {
+        if (values.empty()) {
+            return; // NOP
+        }
         // Copy the data into the queue
         Mutex::Locker locker(mutex_);
         values_.insert(values_.end(), values.begin(), values.end());
@@ -74,11 +85,192 @@ private:
     bool shutdown_;
 };
 
+LandlordResolver::LandlordResolver(size_t count, size_t batch_size,
+                                   size_t worker_count) :
+    interface_(count),
+    peasants_queue_(new FQueue),
+    cache_queue_(new FQueue),
+    send_queue_(new FQueue),
+    upstream_queue_(new FQueue),
+    read_batch_size_(batch_size),
+    worker_batch_(batch_size),
+    worker_count_(worker_count),
+    total_count_(count),
+    outstanding_(0)
+{ }
+
+// Destructor. Make sure it is created where we know how to destroy threads
+// (not in the header file)
+LandlordResolver::~LandlordResolver() {}
+
+void
+LandlordResolver::completedUpstream(FakeQueryPtr query) {
+    outstanding_ -= 1;
+    downstream_queue_.push_back(query);
 }
 
-namespace isc {
-namespace resolver {
-namespace bench {
+bool
+LandlordResolver::checkUpstream(bool block) {
+    // Should we check for some incoming queries?
+    // Only if we may block and if there are some expected.
+    if (block && outstanding_ > 0) {
+        interface_.processEvents();
+        block = false; // We already blocked (possibly)
+    }
+    // Get the queries to be sent out and do so
+    vector<FakeQueryPtr> out;
+    out.reserve(worker_batch_);
+    bool running = upstream_queue_->pop(out, worker_batch_, block);
+    BOOST_FOREACH(const FakeQueryPtr& q, out) {
+        assert(q->nextTask() == Upstream);
+        q->performTask(boost::bind(&LandlordResolver::completedUpstream,
+                                   this, q));
+    }
+    outstanding_ += out.size();
+    if (out.size() == worker_batch_) {
+        // We handled a full batch. There may be more, handle them now.
+        return checkUpstream(false);
+    }
+    // Send all received queries to the workers for processing
+    peasants_queue_->push(downstream_queue_);
+    downstream_queue_.clear();
+    // The result.
+    return running || !out.empty();
+}
+
+void
+LandlordResolver::read_iface() {
+    vector<FakeQueryPtr> queries;
+    queries.reserve(read_batch_size_);
+    FakeQueryPtr q;
+    while ((q = interface_.receiveQuery())) {
+        queries.push_back(q);
+        if (queries.size() >= read_batch_size_) {
+            peasants_queue_->push(queries);
+            queries.clear();
+            queries.reserve(read_batch_size_);
+        }
+        checkUpstream(false);
+    }
+    peasants_queue_->push(queries);
+    while (checkUpstream(true)) {}
+}
+
+namespace {
+
+void stepDone(bool *flag) {
+    *flag = true;
+}
+
+}
+
+void
+LandlordResolver::work() {
+    vector<FakeQueryPtr> queries, writes, sends, upstreams;
+    queries.reserve(worker_batch_);
+    writes.reserve(worker_batch_);
+    sends.reserve(worker_batch_);
+    upstreams.reserve(worker_batch_);
+    while (peasants_queue_->pop(queries, worker_batch_) || !queries.empty()) {
+        BOOST_FOREACH(const FakeQueryPtr& q, queries) {
+            while (true) {
+                switch (q->nextTask()) {
+                    case Compute: // Workers do computations
+                    case CacheRead: {// With RCU, there'd be lock-less read, so workers can do that too.
+                        bool done = false;
+                        q->performTask(boost::bind(&stepDone, &done));
+                        assert(done); // There should be nothing that should wait
+                        assert(!q->done()); // Queries are done once Send is done, which is elsewhere
+                        break; // Try next task
+                    }
+                    // These tasks are delegated to the landlords
+                    case CacheWrite:
+                        writes.push_back(q);
+                        goto WORK_DONE;
+                    case Send:
+                        sends.push_back(q);
+                        goto WORK_DONE;
+                    case Upstream:
+                        upstreams.push_back(q);
+                        goto WORK_DONE;
+                }
+            }
+            WORK_DONE:;
+        }
+        queries.clear();
+        cache_queue_->push(writes);
+        writes.clear();
+        send_queue_->push(sends);
+        sends.clear();
+        upstream_queue_->push(upstreams);
+        if (!upstreams.empty()) {
+            // Wake up the interface main loop if it was sleeping.
+            interface_.wakeup();
+        }
+        upstreams.clear();
+    }
+}
+
+void
+LandlordResolver::cache() {
+    vector<FakeQueryPtr> queries;
+    queries.reserve(worker_batch_);
+    while (cache_queue_->pop(queries, worker_batch_) || !queries.empty()) {
+        BOOST_FOREACH(const FakeQueryPtr& q, queries) {
+            assert(q->nextTask() == CacheWrite);
+            bool done = false;
+            q->performTask(boost::bind(&stepDone, &done));
+            assert(done); // Cache write is synchronous
+        }
+        // Put them all to the workers again to process
+        peasants_queue_->push(queries);
+        queries.clear();
+    }
+}
+
+void
+LandlordResolver::send() {
+    vector<FakeQueryPtr> queries;
+    queries.reserve(worker_batch_);
+    size_t count = 0;
+    while (count < total_count_) {
+        send_queue_->pop(queries, worker_batch_);
+        BOOST_FOREACH(const FakeQueryPtr& q, queries) {
+            assert(q->nextTask() == Send);
+            bool done = false;
+            q->performTask(boost::bind(&stepDone, &done));
+            assert(done); // Cache write is synchronous
+            assert(q->done());
+            count ++;
+        }
+        queries.clear();
+    }
+    // All was sent to the user, so shut down
+    peasants_queue_->shutdown();
+    cache_queue_->shutdown();
+    send_queue_->shutdown();
+    upstream_queue_->shutdown();
+}
+
+size_t
+LandlordResolver::run() {
+    vector<Thread *> threads;
+    threads.push_back(
+        new Thread(boost::bind(&LandlordResolver::read_iface, this)));
+    for (size_t i = 0; i < worker_count_; ++i) {
+        threads.push_back(new Thread(boost::bind(&LandlordResolver::work,
+                                                 this)));
+    }
+    threads.push_back(new Thread(boost::bind(&LandlordResolver::cache, this)));
+    threads.push_back(new Thread(boost::bind(&LandlordResolver::send, this)));
+
+    // Wait for all threads to terminate
+    BOOST_FOREACH(Thread *t, threads) {
+        t->wait();
+        delete t;
+    }
+    return (total_count_);
+}
 
 }
 }
diff --git a/src/bin/resolver/bench/landlord.h b/src/bin/resolver/bench/landlord.h
index c3fbb84..d7c4da8 100644
--- a/src/bin/resolver/bench/landlord.h
+++ b/src/bin/resolver/bench/landlord.h
@@ -17,6 +17,8 @@
 
 #include <resolver/bench/fake_resolution.h>
 
+#include <boost/scoped_ptr.hpp>
+
 namespace isc {
 namespace resolver {
 namespace bench {
@@ -29,16 +31,35 @@ namespace bench {
 class LandlordResolver {
 public:
     /// \brief Constructor. Initializes the data.
-    LandlordResolver(size_t query_count);
+    LandlordResolver(size_t query_count, size_t batch_size,
+                     size_t worker_count);
+    ~ LandlordResolver();
     /// \brief Run the resolution.
     size_t run();
 private:
+    void read_iface();
+    void work();
+    void cache();
+    void send();
+    bool checkUpstream(bool block);
+    void completedUpstream(FakeQueryPtr query);
+
     FakeInterface interface_;
+
+    template<class T, class Container = std::vector<T> > class Queue;
+    typedef Queue<FakeQueryPtr> FQueue;
+    boost::scoped_ptr<FQueue> peasants_queue_, cache_queue_,
+        send_queue_, upstream_queue_;
+    std::vector<FakeQueryPtr> downstream_queue_;
+
+    const size_t read_batch_size_, worker_batch_;
+    const size_t worker_count_;
+    const size_t total_count_;
+    size_t outstanding_;
 };
 
 }
 }
 }
 
-
 #endif
diff --git a/src/bin/resolver/bench/main.cc b/src/bin/resolver/bench/main.cc
index 3007c40..0b77421 100644
--- a/src/bin/resolver/bench/main.cc
+++ b/src/bin/resolver/bench/main.cc
@@ -13,14 +13,28 @@
 // PERFORMANCE OF THIS SOFTWARE.
 
 #include <resolver/bench/naive_resolver.h>
+#include <resolver/bench/landlord.h>
 
 #include <bench/benchmark.h>
 
-const size_t count = 1000; // TODO: We may want to read this from argv.
+#include <iostream>
+
+const size_t count = 10000; // TODO: We may want to read this from argv.
+
+using namespace std;
 
 int main(int, const char**) {
+    for (size_t i = 1; i < 10; i ++) {
+        for (size_t j = 1; j < 10; j ++) {
+            size_t size = j * 25;
+            cout << "Landlord with " << i << " workers and " << size << " batch size " << endl;
+            isc::resolver::bench::LandlordResolver landlord(::count, size, i);
+            isc::bench::BenchMark<isc::resolver::bench::LandlordResolver>
+                (1, landlord, true);
+        }
+    }
     // Run the naive implementation
-    isc::resolver::bench::NaiveResolver naive_resolver(count);
+    isc::resolver::bench::NaiveResolver naive_resolver(::count);
     isc::bench::BenchMark<isc::resolver::bench::NaiveResolver>
         (1, naive_resolver, true);
     return 0;



More information about the bind10-changes mailing list