[svn] commit: r1661 - in /branches/trac58/src: bin/auth/main.cc lib/cc/session.cc lib/cc/session.h lib/config/ccsession.cc
BIND 10 source code commits
bind10-changes at lists.isc.org
Tue Mar 30 15:37:34 UTC 2010
Author: jelte
Date: Tue Mar 30 15:37:33 2010
New Revision: 1661
Log:
implemented queue and hasQueuedMsgs() similar to the python version
Modified:
branches/trac58/src/bin/auth/main.cc
branches/trac58/src/lib/cc/session.cc
branches/trac58/src/lib/cc/session.h
branches/trac58/src/lib/config/ccsession.cc
Modified: branches/trac58/src/bin/auth/main.cc
==============================================================================
--- branches/trac58/src/bin/auth/main.cc (original)
+++ branches/trac58/src/bin/auth/main.cc Tue Mar 30 15:37:33 2010
@@ -664,6 +664,9 @@
FD_SET(ss, &fds);
++nfds;
+ if (srv->configSession()->hasQueuedMsgs()) {
+ srv->configSession()->checkCommand();
+ }
int n = select(nfds, &fds, NULL, NULL, NULL);
if (n < 0) {
if (errno != EINTR) {
Modified: branches/trac58/src/lib/cc/session.cc
==============================================================================
--- branches/trac58/src/lib/cc/session.cc (original)
+++ branches/trac58/src/lib/cc/session.cc Tue Mar 30 15:37:33 2010
@@ -54,7 +54,7 @@
class SessionImpl {
public:
- SessionImpl() : sequence_(-1) {}
+ SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
virtual ~SessionImpl() {}
virtual void establish() = 0;
virtual int getSocket() = 0;
@@ -63,9 +63,10 @@
virtual size_t readDataLength() = 0;
virtual void readData(void* data, size_t datalen) = 0;
virtual void startRead(boost::function<void()> user_handler) = 0;
-
+
int sequence_; // the next sequence number to use
std::string lname_;
+ ElementPtr queue_;
};
#ifdef HAVE_BOOST_SYSTEM
@@ -352,35 +353,38 @@
}
bool
-Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) {
+Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
+ ElementPtr l_env;
+ return recvmsg(l_env, msg, nonblock, seq);
+}
+
+bool
+Session::recvmsg(ElementPtr& env, ElementPtr& msg,
+ bool nonblock, int seq) {
size_t length = impl_->readDataLength();
-
- unsigned short header_length_net;
- impl_->readData(&header_length_net, sizeof(header_length_net));
-
- unsigned short header_length = ntohs(header_length_net);
- if (header_length != length) {
- isc_throw(SessionError, "Length parameters invalid: total=" << length
- << ", header=" << header_length);
- }
-
- std::vector<char> buffer(length);
- impl_->readData(&buffer[0], length);
-
- std::string wire = std::string(&buffer[0], length);
- std::stringstream wire_stream;
- wire_stream << wire;
-
- msg = Element::fromWire(wire_stream, length);
-
- return (true);
- // XXXMLG handle non-block here, and return false for short reads
-}
-
-bool
-Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) {
- size_t length = impl_->readDataLength();
-
+ ElementPtr l_env, l_msg;
+ if (hasQueuedMsgs()) {
+ if (seq == -1) {
+ env = impl_->queue_->get(0)->get(0);
+ msg = impl_->queue_->get(0)->get(1);
+ impl_->queue_->remove(0);
+ return true;
+ } else {
+ ElementPtr q_el;
+ for (int i = 0; i < impl_->queue_->size(); i++) {
+ q_el = impl_->queue_->get(i);
+ if (q_el->get(0)->contains("reply") &&
+ q_el->get(0)->get("reply")->intValue() == seq
+ ) {
+ env = q_el->get(0);
+ msg = q_el->get(1);
+ impl_->queue_->remove(i);
+ return true;
+ }
+ }
+ }
+ }
+
unsigned short header_length_net;
impl_->readData(&header_length_net, sizeof(header_length_net));
@@ -400,13 +404,26 @@
length - header_length);
std::stringstream header_wire_stream;
header_wire_stream << header_wire;
- env = Element::fromWire(header_wire_stream, header_length);
+ l_env = Element::fromWire(header_wire_stream, header_length);
std::stringstream body_wire_stream;
body_wire_stream << body_wire;
- msg = Element::fromWire(body_wire_stream, length - header_length);
-
- return (true);
+ l_msg = Element::fromWire(body_wire_stream, length - header_length);
+ if (seq == -1 ||
+ (l_env->contains("reply") &&
+ l_env->get("reply")->intValue() == seq
+ )
+ ) {
+ env = l_env;
+ msg = l_msg;
+ return true;
+ } else {
+ ElementPtr q_el = Element::createFromString("[]");
+ q_el->add(l_env);
+ q_el->add(l_msg);
+ impl_->queue_->add(q_el);
+ return recvmsg(env, msg, nonblock, seq);
+ }
// XXXMLG handle non-block here, and return false for short reads
}
@@ -432,47 +449,55 @@
sendmsg(env);
}
-unsigned int
+int
Session::group_sendmsg(ElementPtr msg, std::string group,
std::string instance, std::string to)
{
ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+ int nseq = ++impl_->sequence_;
+
env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(to));
env->set("group", Element::create(group));
env->set("instance", Element::create(instance));
- env->set("seq", Element::create(impl_->sequence_));
+ env->set("seq", Element::create(nseq));
//env->set("msg", Element::create(msg->toWire()));
sendmsg(env, msg);
-
- return (++impl_->sequence_);
+ return nseq;
}
bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
- bool nonblock)
+ bool nonblock, int seq)
{
- return (recvmsg(envelope, msg, nonblock));
-}
-
-unsigned int
+ return (recvmsg(envelope, msg, nonblock, seq));
+}
+
+int
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+ int nseq = ++impl_->sequence_;
+
env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(envelope->get("from")->stringValue()));
env->set("group", Element::create(envelope->get("group")->stringValue()));
env->set("instance", Element::create(envelope->get("instance")->stringValue()));
- env->set("seq", Element::create(impl_->sequence_));
+ env->set("seq", Element::create(nseq));
env->set("reply", Element::create(envelope->get("seq")->intValue()));
sendmsg(env, newmsg);
- return (++impl_->sequence_);
-}
-}
-}
+ return nseq;
+}
+
+bool
+Session::hasQueuedMsgs()
+{
+ return (impl_->queue_->size() > 0);
+}
+
+}
+}
Modified: branches/trac58/src/lib/cc/session.h
==============================================================================
--- branches/trac58/src/lib/cc/session.h (original)
+++ branches/trac58/src/lib/cc/session.h Tue Mar 30 15:37:33 2010
@@ -65,23 +65,27 @@
void sendmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg);
bool recvmsg(isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true,
+ int seq = -1);
bool recvmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true,
+ int seq = -1);
void subscribe(std::string group,
std::string instance = "*");
void unsubscribe(std::string group,
std::string instance = "*");
- unsigned int group_sendmsg(isc::data::ElementPtr msg,
+ int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
std::string instance = "*",
std::string to = "*");
bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg,
- bool nonblock = true);
- unsigned int reply(isc::data::ElementPtr& envelope,
+ bool nonblock = true,
+ int seq = -1);
+ int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
+ bool hasQueuedMsgs();
};
} // namespace cc
} // namespace isc
Modified: branches/trac58/src/lib/config/ccsession.cc
==============================================================================
--- branches/trac58/src/lib/config/ccsession.cc (original)
+++ branches/trac58/src/lib/config/ccsession.cc Tue Mar 30 15:37:33 2010
@@ -248,8 +248,8 @@
//session_.subscribe("statistics", "*");
// send the data specification
ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec());
- session_.group_sendmsg(spec_msg, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
int rcode;
ElementPtr err = parseAnswer(rcode, answer);
if (rcode != 0) {
@@ -260,8 +260,8 @@
// get any stored configuration from the manager
if (config_handler_) {
ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
- session_.group_sendmsg(cmd, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ seq = session_.group_sendmsg(cmd, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) {
handleConfigUpdate(new_config);
@@ -365,8 +365,8 @@
ElementPtr env, answer;
int rcode;
- session_.group_sendmsg(cmd, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) {
rmod_config.setLocalConfig(new_config);
More information about the bind10-changes
mailing list