[svn] commit: r1870 - in /trunk/src: bin/auth/ lib/cc/ lib/config/ lib/config/tests/ lib/python/isc/cc/ lib/python/isc/cc/tests/
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu May 20 10:34:27 UTC 2010
Author: jelte
Date: Thu May 20 10:34:26 2010
New Revision: 1870
Log:
merge branches/trac58
(message queuing on cc channel)
see also http://bind10.isc.org/ticket/58
Modified:
trunk/src/bin/auth/main.cc
trunk/src/lib/cc/session.cc
trunk/src/lib/cc/session.h
trunk/src/lib/config/ccsession.cc
trunk/src/lib/config/ccsession.h
trunk/src/lib/config/tests/Makefile.am
trunk/src/lib/config/tests/fake_session.cc
trunk/src/lib/config/tests/fake_session.h
trunk/src/lib/python/isc/cc/session.py
trunk/src/lib/python/isc/cc/tests/session_test.py
Modified: trunk/src/bin/auth/main.cc
==============================================================================
--- trunk/src/bin/auth/main.cc (original)
+++ trunk/src/bin/auth/main.cc Thu May 20 10:34:26 2010
@@ -664,6 +664,9 @@
FD_SET(ss, &fds);
++nfds;
+ if (srv->configSession()->hasQueuedMsgs()) {
+ srv->configSession()->checkCommand();
+ }
int n = select(nfds, &fds, NULL, NULL, NULL);
if (n < 0) {
if (errno != EINTR) {
Modified: trunk/src/lib/cc/session.cc
==============================================================================
--- trunk/src/lib/cc/session.cc (original)
+++ trunk/src/lib/cc/session.cc Thu May 20 10:34:26 2010
@@ -54,7 +54,7 @@
class SessionImpl {
public:
- SessionImpl() : sequence_(-1) {}
+ SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
virtual ~SessionImpl() {}
virtual void establish() = 0;
virtual int getSocket() = 0;
@@ -63,9 +63,10 @@
virtual size_t readDataLength() = 0;
virtual void readData(void* data, size_t datalen) = 0;
virtual void startRead(boost::function<void()> user_handler) = 0;
-
+
int sequence_; // the next sequence number to use
std::string lname_;
+ ElementPtr queue_;
};
#ifdef HAVE_BOOST_SYSTEM
@@ -352,35 +353,35 @@
}
bool
-Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) {
+Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
+ ElementPtr l_env;
+ return recvmsg(l_env, msg, nonblock, seq);
+}
+
+bool
+Session::recvmsg(ElementPtr& env, ElementPtr& msg,
+ bool nonblock, int seq) {
size_t length = impl_->readDataLength();
-
- unsigned short header_length_net;
- impl_->readData(&header_length_net, sizeof(header_length_net));
-
- unsigned short header_length = ntohs(header_length_net);
- if (header_length != length) {
- isc_throw(SessionError, "Length parameters invalid: total=" << length
- << ", header=" << header_length);
- }
-
- std::vector<char> buffer(length);
- impl_->readData(&buffer[0], length);
-
- std::string wire = std::string(&buffer[0], length);
- std::stringstream wire_stream;
- wire_stream << wire;
-
- msg = Element::fromWire(wire_stream, length);
-
- return (true);
- // XXXMLG handle non-block here, and return false for short reads
-}
-
-bool
-Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) {
- size_t length = impl_->readDataLength();
-
+ ElementPtr l_env, l_msg;
+ if (hasQueuedMsgs()) {
+ ElementPtr q_el;
+ for (int i = 0; i < impl_->queue_->size(); i++) {
+ q_el = impl_->queue_->get(i);
+ if (( seq == -1 &&
+ !q_el->get(0)->contains("reply")
+ ) || (
+ q_el->get(0)->contains("reply") &&
+ q_el->get(0)->get("reply")->intValue() == seq
+ )
+ ) {
+ env = q_el->get(0);
+ msg = q_el->get(1);
+ impl_->queue_->remove(i);
+ return true;
+ }
+ }
+ }
+
unsigned short header_length_net;
impl_->readData(&header_length_net, sizeof(header_length_net));
@@ -400,13 +401,28 @@
length - header_length);
std::stringstream header_wire_stream;
header_wire_stream << header_wire;
- env = Element::fromWire(header_wire_stream, header_length);
+ l_env = Element::fromWire(header_wire_stream, header_length);
std::stringstream body_wire_stream;
body_wire_stream << body_wire;
- msg = Element::fromWire(body_wire_stream, length - header_length);
-
- return (true);
+ l_msg = Element::fromWire(body_wire_stream, length - header_length);
+ if ((seq == -1 &&
+ !l_env->contains("reply")
+ ) || (
+ l_env->contains("reply") &&
+ l_env->get("reply")->intValue() == seq
+ )
+ ) {
+ env = l_env;
+ msg = l_msg;
+ return true;
+ } else {
+ ElementPtr q_el = Element::createFromString("[]");
+ q_el->add(l_env);
+ q_el->add(l_msg);
+ impl_->queue_->add(q_el);
+ return recvmsg(env, msg, nonblock, seq);
+ }
// XXXMLG handle non-block here, and return false for short reads
}
@@ -432,47 +448,55 @@
sendmsg(env);
}
-unsigned int
+int
Session::group_sendmsg(ElementPtr msg, std::string group,
std::string instance, std::string to)
{
ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+ int nseq = ++impl_->sequence_;
+
env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(to));
env->set("group", Element::create(group));
env->set("instance", Element::create(instance));
- env->set("seq", Element::create(impl_->sequence_));
+ env->set("seq", Element::create(nseq));
//env->set("msg", Element::create(msg->toWire()));
sendmsg(env, msg);
-
- return (++impl_->sequence_);
+ return nseq;
}
bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
- bool nonblock)
+ bool nonblock, int seq)
{
- return (recvmsg(envelope, msg, nonblock));
-}
-
-unsigned int
+ return (recvmsg(envelope, msg, nonblock, seq));
+}
+
+int
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
-
+ int nseq = ++impl_->sequence_;
+
env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(envelope->get("from")->stringValue()));
env->set("group", Element::create(envelope->get("group")->stringValue()));
env->set("instance", Element::create(envelope->get("instance")->stringValue()));
- env->set("seq", Element::create(impl_->sequence_));
+ env->set("seq", Element::create(nseq));
env->set("reply", Element::create(envelope->get("seq")->intValue()));
sendmsg(env, newmsg);
- return (++impl_->sequence_);
-}
-}
-}
+ return nseq;
+}
+
+bool
+Session::hasQueuedMsgs()
+{
+ return (impl_->queue_->size() > 0);
+}
+
+}
+}
Modified: trunk/src/lib/cc/session.h
==============================================================================
--- trunk/src/lib/cc/session.h (original)
+++ trunk/src/lib/cc/session.h Thu May 20 10:34:26 2010
@@ -65,23 +65,27 @@
void sendmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg);
bool recvmsg(isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true,
+ int seq = -1);
bool recvmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true,
+ int seq = -1);
void subscribe(std::string group,
std::string instance = "*");
void unsubscribe(std::string group,
std::string instance = "*");
- unsigned int group_sendmsg(isc::data::ElementPtr msg,
+ int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
std::string instance = "*",
std::string to = "*");
bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg,
- bool nonblock = true);
- unsigned int reply(isc::data::ElementPtr& envelope,
+ bool nonblock = true,
+ int seq = -1);
+ int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
+ bool hasQueuedMsgs();
};
} // namespace cc
} // namespace isc
Modified: trunk/src/lib/config/ccsession.cc
==============================================================================
--- trunk/src/lib/config/ccsession.cc (original)
+++ trunk/src/lib/config/ccsession.cc Thu May 20 10:34:26 2010
@@ -248,8 +248,8 @@
//session_.subscribe("statistics", "*");
// send the data specification
ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec());
- session_.group_sendmsg(spec_msg, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
int rcode;
ElementPtr err = parseAnswer(rcode, answer);
if (rcode != 0) {
@@ -260,8 +260,8 @@
// get any stored configuration from the manager
if (config_handler_) {
ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
- session_.group_sendmsg(cmd, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ seq = session_.group_sendmsg(cmd, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) {
handleConfigUpdate(new_config);
@@ -308,6 +308,12 @@
ModuleCCSession::getSocket()
{
return (session_.getSocket());
+}
+
+bool
+ModuleCCSession::hasQueuedMsgs()
+{
+ return (session_.hasQueuedMsgs());
}
int
@@ -365,8 +371,8 @@
ElementPtr env, answer;
int rcode;
- session_.group_sendmsg(cmd, "ConfigManager");
- session_.group_recvmsg(env, answer, false);
+ unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager");
+ session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) {
rmod_config.setLocalConfig(new_config);
Modified: trunk/src/lib/config/ccsession.h
==============================================================================
--- trunk/src/lib/config/ccsession.h (original)
+++ trunk/src/lib/config/ccsession.h Thu May 20 10:34:26 2010
@@ -148,6 +148,16 @@
* channel.
*/
int getSocket();
+
+ /**
+ * Optional optimization for checkCommand loop; returns true
+ * if there are unhandled queued messages in the cc session.
+ * (if either this is true or there is data on the socket found
+ * by the select() call on getSocket(), run checkCommand())
+ *
+ * @return true if there are unhandled queued messages
+ */
+ bool hasQueuedMsgs();
/**
* Check if there is a command or config change on the command
Modified: trunk/src/lib/config/tests/Makefile.am
==============================================================================
--- trunk/src/lib/config/tests/Makefile.am (original)
+++ trunk/src/lib/config/tests/Makefile.am Thu May 20 10:34:26 2010
@@ -14,9 +14,9 @@
run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS)
run_unittests_LDADD = $(GTEST_LDADD)
run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la
+run_unittests_LDADD += libfake_session.la
run_unittests_LDADD += $(top_builddir)/src/lib/config/libcfgclient.la
run_unittests_LDADD += $(top_builddir)/src/lib/cc/data.o
-run_unittests_LDADD += libfake_session.la
if HAVE_BOOST_SYSTEM
run_unittests_LDFLAGS += $(AM_LDFLAGS) $(BOOST_LDFLAGS)
Modified: trunk/src/lib/config/tests/fake_session.cc
==============================================================================
--- trunk/src/lib/config/tests/fake_session.cc (original)
+++ trunk/src/lib/config/tests/fake_session.cc Thu May 20 10:34:26 2010
@@ -153,6 +153,11 @@
Session::~Session() {
}
+bool
+Session::connect() {
+ return true;
+}
+
void
Session::disconnect() {
}
@@ -188,7 +193,7 @@
}
bool
-Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) {
+Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) {
//cout << "[XX] client asks for message " << endl;
if (initial_messages &&
initial_messages->getType() == Element::list &&
@@ -202,7 +207,7 @@
}
bool
-Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) {
+Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) {
//cout << "[XX] client asks for message and env" << endl;
env = ElementPtr();
if (initial_messages &&
@@ -269,9 +274,9 @@
bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
- bool nonblock)
-{
- return (recvmsg(envelope, msg, nonblock));
+ bool nonblock, int seq)
+{
+ return (recvmsg(envelope, msg, nonblock, seq));
}
unsigned int
@@ -282,5 +287,10 @@
return 1;
}
-}
-}
+bool
+Session::hasQueuedMsgs() {
+ return false;
+}
+
+}
+}
Modified: trunk/src/lib/config/tests/fake_session.h
==============================================================================
--- trunk/src/lib/config/tests/fake_session.h (original)
+++ trunk/src/lib/config/tests/fake_session.h Thu May 20 10:34:26 2010
@@ -74,15 +74,16 @@
void startRead(boost::function<void()> read_callback);
void establish();
+ bool connect();
void disconnect();
void sendmsg(isc::data::ElementPtr& msg);
void sendmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg);
bool recvmsg(isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true, int seq = -1);
bool recvmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true, int seq = -1);
void subscribe(std::string group,
std::string instance = "*");
void unsubscribe(std::string group,
@@ -93,9 +94,11 @@
std::string to = "*");
bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg,
- bool nonblock = true);
+ bool nonblock = true,
+ int seq = -1);
unsigned int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
+ bool hasQueuedMsgs();
};
} // namespace cc
Modified: trunk/src/lib/python/isc/cc/session.py
==============================================================================
--- trunk/src/lib/python/isc/cc/session.py (original)
+++ trunk/src/lib/python/isc/cc/session.py Thu May 20 10:34:26 2010
@@ -17,6 +17,7 @@
import socket
import struct
import os
+import threading
import isc.cc.message
@@ -33,6 +34,7 @@
self._sequence = 1
self._closed = False
self._queue = []
+ self._lock = threading.RLock()
if port == 0:
if 'ISC_MSGQ_PORT' in os.environ:
@@ -64,56 +66,54 @@
self._closed = True
def sendmsg(self, env, msg = None):
- XXmsg = msg
- XXenv = env
- if self._closed:
- raise SessionError("Session has been closed.")
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- self._socket.setblocking(1)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- self._socket.send(struct.pack("!I", length))
- self._socket.send(struct.pack("!H", len(env)))
- self._socket.send(env)
- if msg:
- self._socket.send(msg)
+ with self._lock:
+ if self._closed:
+ raise SessionError("Session has been closed.")
+ if type(env) == dict:
+ env = isc.cc.message.to_wire(env)
+ if type(msg) == dict:
+ msg = isc.cc.message.to_wire(msg)
+ self._socket.setblocking(1)
+ length = 2 + len(env);
+ if msg:
+ length += len(msg)
+ self._socket.send(struct.pack("!I", length))
+ self._socket.send(struct.pack("!H", len(env)))
+ self._socket.send(env)
+ if msg:
+ self._socket.send(msg)
def recvmsg(self, nonblock = True, seq = None):
- #print("[XX] queue len: " + str(len(self._queue)))
- if len(self._queue) > 0:
- if seq == None:
- #print("[XX] return first")
- return self._queue.pop(0)
- else:
+ with self._lock:
+ if len(self._queue) > 0:
i = 0;
- #print("[XX] check rest")
for env, msg in self._queue:
- if "reply" in env and seq == env["reply"]:
+ if seq != None and "reply" in env and seq == env["reply"]:
+ return self._queue.pop(i)
+ elif seq == None and "reply" not in env:
return self._queue.pop(i)
else:
i = i + 1
- #print("[XX] not found")
- if self._closed:
- raise SessionError("Session has been closed.")
- data = self._receive_full_buffer(nonblock)
- if data and len(data) > 2:
- header_length = struct.unpack('>H', data[0:2])[0]
- data_length = len(data) - 2 - header_length
- if data_length > 0:
- env = isc.cc.message.from_wire(data[2:header_length+2])
- msg = isc.cc.message.from_wire(data[header_length + 2:])
- if seq == None or "reply" in env and seq == env["reply"]:
- return env, msg
+ if self._closed:
+ raise SessionError("Session has been closed.")
+ data = self._receive_full_buffer(nonblock)
+ if data and len(data) > 2:
+ header_length = struct.unpack('>H', data[0:2])[0]
+ data_length = len(data) - 2 - header_length
+ if data_length > 0:
+ env = isc.cc.message.from_wire(data[2:header_length+2])
+ msg = isc.cc.message.from_wire(data[header_length + 2:])
+ if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
+ return env, msg
+ else:
+ tmp = None
+ if "reply" in env:
+ tmp = env["reply"]
+ self._queue.append((env,msg))
+ return self.recvmsg(nonblock, seq)
else:
- self._queue.append((env,msg))
- return self.recvmsg(nonblock, seq)
- else:
- return isc.cc.message.from_wire(data[2:header_length+2]), None
- return None, None
+ return isc.cc.message.from_wire(data[2:header_length+2]), None
+ return None, None
def _receive_full_buffer(self, nonblock):
if nonblock:
@@ -130,7 +130,6 @@
return None
if data == "": # server closed connection
raise ProtocolError("Read of 0 bytes: connection closed")
-
self._recvbuffer += data
if len(self._recvbuffer) < 4:
return None
@@ -182,6 +181,9 @@
}, isc.cc.message.to_wire(msg))
return seq
+ def has_queued_msgs(self):
+ return len(self._queue) > 0
+
def group_recvmsg(self, nonblock = True, seq = None):
env, msg = self.recvmsg(nonblock, seq)
if env == None:
Modified: trunk/src/lib/python/isc/cc/tests/session_test.py
==============================================================================
--- trunk/src/lib/python/isc/cc/tests/session_test.py (original)
+++ trunk/src/lib/python/isc/cc/tests/session_test.py Thu May 20 10:34:26 2010
@@ -179,65 +179,82 @@
#print("sending message {'to': 'someone', 'reply': 1}, {'hello': 'a'}")
# simply get the message without asking for a specific sequence number reply
- sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
- env, msg = sess.recvmsg(False)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
+ sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
+ env, msg = sess.recvmsg(False)
+ self.assertEqual({'to': 'someone', 'reply': 1}, env)
+ self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
# simply get the message, asking for a specific sequence number reply
+ self.assertFalse(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
env, msg = sess.recvmsg(False, 1)
self.assertEqual({'to': 'someone', 'reply': 1}, env)
self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
# ask for a differe sequence number reply (that doesn't exist)
# then ask for the one that is there
+ self.assertFalse(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
env, msg = sess.recvmsg(False, 2)
self.assertEqual(None, env)
self.assertEqual(None, msg)
+ self.assertTrue(sess.has_queued_msgs())
env, msg = sess.recvmsg(False, 1)
self.assertEqual({'to': 'someone', 'reply': 1}, env)
self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
# ask for a differe sequence number reply (that doesn't exist)
# then ask for any message
+ self.assertFalse(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
env, msg = sess.recvmsg(False, 2)
self.assertEqual(None, env)
self.assertEqual(None, msg)
- env, msg = sess.recvmsg(False)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
+ self.assertTrue(sess.has_queued_msgs())
+ env, msg = sess.recvmsg(False)
+ self.assertEqual({'to': 'someone', 'reply': 1}, env)
+ self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
#print("sending message {'to': 'someone', 'reply': 1}, {'hello': 'a'}")
# ask for a differe sequence number reply (that doesn't exist)
# send a new message, ask for any message (get the first)
# then ask for any message (get the second)
+ self.assertFalse(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
env, msg = sess.recvmsg(False, 2)
self.assertEqual(None, env)
self.assertEqual(None, msg)
+ self.assertTrue(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00\x1f\x00\x10Skan\x02to(\x07someoneSkan\x05hello(\x01b')
env, msg = sess.recvmsg(False)
self.assertEqual({'to': 'someone', 'reply': 1}, env)
self.assertEqual({"hello": "a"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
env, msg = sess.recvmsg(False)
self.assertEqual({'to': 'someone'}, env)
self.assertEqual({"hello": "b"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
# send a message, then one with specific reply value
# ask for that specific message (get the second)
# then ask for any message (get the first)
+ self.assertFalse(sess.has_queued_msgs())
sess._socket.addrecv(b'\x00\x00\x00\x1f\x00\x10Skan\x02to(\x07someoneSkan\x05hello(\x01b')
sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a')
env, msg = sess.recvmsg(False, 1)
self.assertEqual({'to': 'someone', 'reply': 1}, env)
self.assertEqual({"hello": "a"}, msg)
+ self.assertTrue(sess.has_queued_msgs())
env, msg = sess.recvmsg(False)
self.assertEqual({'to': 'someone'}, env)
self.assertEqual({"hello": "b"}, msg)
+ self.assertFalse(sess.has_queued_msgs())
def test_next_sequence(self):
sess = MySession()
More information about the bind10-changes
mailing list