[svn] commit: r1661 - in /branches/trac58/src: bin/auth/main.cc lib/cc/session.cc lib/cc/session.h lib/config/ccsession.cc

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Mar 30 15:37:34 UTC 2010


Author: jelte
Date: Tue Mar 30 15:37:33 2010
New Revision: 1661

Log:
implemented queue and hasQueuedMsgs() similar to the python version

Modified:
    branches/trac58/src/bin/auth/main.cc
    branches/trac58/src/lib/cc/session.cc
    branches/trac58/src/lib/cc/session.h
    branches/trac58/src/lib/config/ccsession.cc

Modified: branches/trac58/src/bin/auth/main.cc
==============================================================================
--- branches/trac58/src/bin/auth/main.cc (original)
+++ branches/trac58/src/bin/auth/main.cc Tue Mar 30 15:37:33 2010
@@ -664,6 +664,9 @@
         FD_SET(ss, &fds);
         ++nfds;
 
+        if (srv->configSession()->hasQueuedMsgs()) {
+            srv->configSession()->checkCommand();
+        }
         int n = select(nfds, &fds, NULL, NULL, NULL);
         if (n < 0) {
             if (errno != EINTR) {

Modified: branches/trac58/src/lib/cc/session.cc
==============================================================================
--- branches/trac58/src/lib/cc/session.cc (original)
+++ branches/trac58/src/lib/cc/session.cc Tue Mar 30 15:37:33 2010
@@ -54,7 +54,7 @@
 
 class SessionImpl {
 public:
-    SessionImpl() : sequence_(-1) {}
+    SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
     virtual ~SessionImpl() {}
     virtual void establish() = 0; 
     virtual int getSocket() = 0;
@@ -63,9 +63,10 @@
     virtual size_t readDataLength() = 0;
     virtual void readData(void* data, size_t datalen) = 0;
     virtual void startRead(boost::function<void()> user_handler) = 0;
-
+    
     int sequence_; // the next sequence number to use
     std::string lname_;
+    ElementPtr queue_;
 };
 
 #ifdef HAVE_BOOST_SYSTEM
@@ -352,35 +353,38 @@
 }
 
 bool
-Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) {
+Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
+    ElementPtr l_env;
+    return recvmsg(l_env, msg, nonblock, seq);
+}
+
+bool
+Session::recvmsg(ElementPtr& env, ElementPtr& msg,
+                 bool nonblock, int seq) {
     size_t length = impl_->readDataLength();
-
-    unsigned short header_length_net;
-    impl_->readData(&header_length_net, sizeof(header_length_net));
-
-    unsigned short header_length = ntohs(header_length_net);
-    if (header_length != length) {
-        isc_throw(SessionError, "Length parameters invalid: total=" << length
-                  << ", header=" << header_length);
-    }
-
-    std::vector<char> buffer(length);
-    impl_->readData(&buffer[0], length);
-
-    std::string wire = std::string(&buffer[0], length);
-    std::stringstream wire_stream;
-    wire_stream << wire;
-
-    msg = Element::fromWire(wire_stream, length);
-
-    return (true);
-    // XXXMLG handle non-block here, and return false for short reads
-}
-
-bool
-Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) {
-    size_t length = impl_->readDataLength();
-
+    ElementPtr l_env, l_msg;
+    if (hasQueuedMsgs()) {
+        if (seq == -1) {
+            env = impl_->queue_->get(0)->get(0);
+            msg = impl_->queue_->get(0)->get(1);
+            impl_->queue_->remove(0);
+            return true;
+        } else {
+            ElementPtr q_el;
+            for (int i = 0; i < impl_->queue_->size(); i++) {
+                q_el = impl_->queue_->get(i);
+                if (q_el->get(0)->contains("reply") &&
+                    q_el->get(0)->get("reply")->intValue() == seq
+                   ) {
+                       env = q_el->get(0);
+                       msg = q_el->get(1);
+                       impl_->queue_->remove(i);
+                       return true;
+                }
+            }
+        }
+    }
+    
     unsigned short header_length_net;
     impl_->readData(&header_length_net, sizeof(header_length_net));
 
@@ -400,13 +404,26 @@
                                         length - header_length);
     std::stringstream header_wire_stream;
     header_wire_stream << header_wire;
-    env = Element::fromWire(header_wire_stream, header_length);
+    l_env = Element::fromWire(header_wire_stream, header_length);
     
     std::stringstream body_wire_stream;
     body_wire_stream << body_wire;
-    msg = Element::fromWire(body_wire_stream, length - header_length);
-
-    return (true);
+    l_msg = Element::fromWire(body_wire_stream, length - header_length);
+    if (seq == -1 ||
+        (l_env->contains("reply") &&
+         l_env->get("reply")->intValue() == seq
+        )
+       ) {
+        env = l_env;
+        msg = l_msg;
+        return true;
+    } else {
+        ElementPtr q_el = Element::createFromString("[]");
+        q_el->add(l_env);
+        q_el->add(l_msg);
+        impl_->queue_->add(q_el);
+        return recvmsg(env, msg, nonblock, seq);
+    }
     // XXXMLG handle non-block here, and return false for short reads
 }
 
@@ -432,47 +449,55 @@
     sendmsg(env);
 }
 
-unsigned int
+int
 Session::group_sendmsg(ElementPtr msg, std::string group,
                        std::string instance, std::string to)
 {
     ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+    int nseq = ++impl_->sequence_;
+    
     env->set("type", Element::create("send"));
     env->set("from", Element::create(impl_->lname_));
     env->set("to", Element::create(to));
     env->set("group", Element::create(group));
     env->set("instance", Element::create(instance));
-    env->set("seq", Element::create(impl_->sequence_));
+    env->set("seq", Element::create(nseq));
     //env->set("msg", Element::create(msg->toWire()));
 
     sendmsg(env, msg);
-
-    return (++impl_->sequence_);
+    return nseq;
 }
 
 bool
 Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
-                       bool nonblock)
+                       bool nonblock, int seq)
 {
-    return (recvmsg(envelope, msg, nonblock));
-}
-
-unsigned int
+    return (recvmsg(envelope, msg, nonblock, seq));
+}
+
+int
 Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
     ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+    int nseq = ++impl_->sequence_;
+    
     env->set("type", Element::create("send"));
     env->set("from", Element::create(impl_->lname_));
     env->set("to", Element::create(envelope->get("from")->stringValue()));
     env->set("group", Element::create(envelope->get("group")->stringValue()));
     env->set("instance", Element::create(envelope->get("instance")->stringValue()));
-    env->set("seq", Element::create(impl_->sequence_));
+    env->set("seq", Element::create(nseq));
     env->set("reply", Element::create(envelope->get("seq")->intValue()));
 
     sendmsg(env, newmsg);
 
-    return (++impl_->sequence_);
-}
-}
-}
+    return nseq;
+}
+
+bool
+Session::hasQueuedMsgs()
+{
+    return (impl_->queue_->size() > 0);
+}
+
+}
+}

Modified: branches/trac58/src/lib/cc/session.h
==============================================================================
--- branches/trac58/src/lib/cc/session.h (original)
+++ branches/trac58/src/lib/cc/session.h Tue Mar 30 15:37:33 2010
@@ -65,23 +65,27 @@
             void sendmsg(isc::data::ElementPtr& env,
                          isc::data::ElementPtr& msg);
             bool recvmsg(isc::data::ElementPtr& msg,
-                         bool nonblock = true);
+                         bool nonblock = true,
+                         int seq = -1);
             bool recvmsg(isc::data::ElementPtr& env,
                          isc::data::ElementPtr& msg,
-                         bool nonblock = true);
+                         bool nonblock = true,
+                         int seq = -1);
             void subscribe(std::string group,
                            std::string instance = "*");
             void unsubscribe(std::string group,
                              std::string instance = "*");
-            unsigned int group_sendmsg(isc::data::ElementPtr msg,
+            int group_sendmsg(isc::data::ElementPtr msg,
                                        std::string group,
                                        std::string instance = "*",
                                        std::string to = "*");
             bool group_recvmsg(isc::data::ElementPtr& envelope,
                                isc::data::ElementPtr& msg,
-                               bool nonblock = true);
-            unsigned int reply(isc::data::ElementPtr& envelope,
+                               bool nonblock = true,
+                               int seq = -1);
+            int reply(isc::data::ElementPtr& envelope,
                                isc::data::ElementPtr& newmsg);
+            bool hasQueuedMsgs();
         };
     } // namespace cc
 } // namespace isc

Modified: branches/trac58/src/lib/config/ccsession.cc
==============================================================================
--- branches/trac58/src/lib/config/ccsession.cc (original)
+++ branches/trac58/src/lib/config/ccsession.cc Tue Mar 30 15:37:33 2010
@@ -248,8 +248,8 @@
     //session_.subscribe("statistics", "*");
     // send the data specification
     ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec());
-    session_.group_sendmsg(spec_msg, "ConfigManager");
-    session_.group_recvmsg(env, answer, false);
+    unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager");
+    session_.group_recvmsg(env, answer, false, seq);
     int rcode;
     ElementPtr err = parseAnswer(rcode, answer);
     if (rcode != 0) {
@@ -260,8 +260,8 @@
     // get any stored configuration from the manager
     if (config_handler_) {
         ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
-        session_.group_sendmsg(cmd, "ConfigManager");
-        session_.group_recvmsg(env, answer, false);
+        seq = session_.group_sendmsg(cmd, "ConfigManager");
+        session_.group_recvmsg(env, answer, false, seq);
         ElementPtr new_config = parseAnswer(rcode, answer);
         if (rcode == 0) {
             handleConfigUpdate(new_config);
@@ -365,8 +365,8 @@
     ElementPtr env, answer;
     int rcode;
     
-    session_.group_sendmsg(cmd, "ConfigManager");
-    session_.group_recvmsg(env, answer, false);
+    unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager");
+    session_.group_recvmsg(env, answer, false, seq);
     ElementPtr new_config = parseAnswer(rcode, answer);
     if (rcode == 0) {
         rmod_config.setLocalConfig(new_config);




More information about the bind10-changes mailing list