[svn] commit: r1663 - in /branches/trac58/src/lib: cc/session.cc python/isc/cc/session.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Wed Mar 31 14:51:19 UTC 2010
Author: jelte
Date: Wed Mar 31 14:51:18 2010
New Revision: 1663
Log:
b10-cmdctl sometimes tries to read a command from the command channel twice at the same time (one when excpecting an answer to a command, and one listening for commands in general)
Added two things:
the session cc/session.py now has a lock in recvmsg() and sendmsg(), so that it does not try to read a message while it is still reading one
recvmsg() with no sequence id specified does not return a message that actually is a reply to a command, so that the general command reader does not eat away the reply expected in send/recv pairs
We might need to rethink the latter; this means that you will never get a message sent with group_reply unless you explicitely specify you want that reply. The other option is to put way more severe restrictions on how to read messages from the channel.
Modified:
branches/trac58/src/lib/cc/session.cc
branches/trac58/src/lib/python/isc/cc/session.py
Modified: branches/trac58/src/lib/cc/session.cc
==============================================================================
--- branches/trac58/src/lib/cc/session.cc (original)
+++ branches/trac58/src/lib/cc/session.cc Wed Mar 31 14:51:18 2010
@@ -364,23 +364,20 @@
size_t length = impl_->readDataLength();
ElementPtr l_env, l_msg;
if (hasQueuedMsgs()) {
- if (seq == -1) {
- env = impl_->queue_->get(0)->get(0);
- msg = impl_->queue_->get(0)->get(1);
- impl_->queue_->remove(0);
- return true;
- } else {
- ElementPtr q_el;
- for (int i = 0; i < impl_->queue_->size(); i++) {
- q_el = impl_->queue_->get(i);
- if (q_el->get(0)->contains("reply") &&
- q_el->get(0)->get("reply")->intValue() == seq
- ) {
- env = q_el->get(0);
- msg = q_el->get(1);
- impl_->queue_->remove(i);
- return true;
- }
+ ElementPtr q_el;
+ for (int i = 0; i < impl_->queue_->size(); i++) {
+ q_el = impl_->queue_->get(i);
+ if (( seq == -1 &&
+ !q_el->get(0)->contains("reply")
+ ) || (
+ q_el->get(0)->contains("reply") &&
+ q_el->get(0)->get("reply")->intValue() == seq
+ )
+ ) {
+ env = q_el->get(0);
+ msg = q_el->get(1);
+ impl_->queue_->remove(i);
+ return true;
}
}
}
@@ -409,8 +406,10 @@
std::stringstream body_wire_stream;
body_wire_stream << body_wire;
l_msg = Element::fromWire(body_wire_stream, length - header_length);
- if (seq == -1 ||
- (l_env->contains("reply") &&
+ if ((seq == -1 &&
+ !l_env->contains("reply")
+ ) || (
+ l_env->contains("reply") &&
l_env->get("reply")->intValue() == seq
)
) {
Modified: branches/trac58/src/lib/python/isc/cc/session.py
==============================================================================
--- branches/trac58/src/lib/python/isc/cc/session.py (original)
+++ branches/trac58/src/lib/python/isc/cc/session.py Wed Mar 31 14:51:18 2010
@@ -17,6 +17,7 @@
import socket
import struct
import os
+import threading
import isc.cc.message
@@ -33,6 +34,7 @@
self._sequence = 1
self._closed = False
self._queue = []
+ self._lock = threading.RLock()
if port == 0:
if 'ISC_MSGQ_PORT' in os.environ:
@@ -64,52 +66,54 @@
self._closed = True
def sendmsg(self, env, msg = None):
- XXmsg = msg
- XXenv = env
- if self._closed:
- raise SessionError("Session has been closed.")
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- self._socket.setblocking(1)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- self._socket.send(struct.pack("!I", length))
- self._socket.send(struct.pack("!H", len(env)))
- self._socket.send(env)
- if msg:
- self._socket.send(msg)
+ with self._lock:
+ if self._closed:
+ raise SessionError("Session has been closed.")
+ if type(env) == dict:
+ env = isc.cc.message.to_wire(env)
+ if type(msg) == dict:
+ msg = isc.cc.message.to_wire(msg)
+ self._socket.setblocking(1)
+ length = 2 + len(env);
+ if msg:
+ length += len(msg)
+ self._socket.send(struct.pack("!I", length))
+ self._socket.send(struct.pack("!H", len(env)))
+ self._socket.send(env)
+ if msg:
+ self._socket.send(msg)
def recvmsg(self, nonblock = True, seq = None):
- if len(self._queue) > 0:
- if seq == None:
- return self._queue.pop(0)
- else:
+ with self._lock:
+ if len(self._queue) > 0:
i = 0;
for env, msg in self._queue:
- if "reply" in env and seq == env["reply"]:
+ if seq != None and "reply" in env and seq == env["reply"]:
+ return self._queue.pop(i)
+ elif seq == None and "reply" not in env:
return self._queue.pop(i)
else:
i = i + 1
- if self._closed:
- raise SessionError("Session has been closed.")
- data = self._receive_full_buffer(nonblock)
- if data and len(data) > 2:
- header_length = struct.unpack('>H', data[0:2])[0]
- data_length = len(data) - 2 - header_length
- if data_length > 0:
- env = isc.cc.message.from_wire(data[2:header_length+2])
- msg = isc.cc.message.from_wire(data[header_length + 2:])
- if seq == None or "reply" in env and seq == env["reply"]:
- return env, msg
+ if self._closed:
+ raise SessionError("Session has been closed.")
+ data = self._receive_full_buffer(nonblock)
+ if data and len(data) > 2:
+ header_length = struct.unpack('>H', data[0:2])[0]
+ data_length = len(data) - 2 - header_length
+ if data_length > 0:
+ env = isc.cc.message.from_wire(data[2:header_length+2])
+ msg = isc.cc.message.from_wire(data[header_length + 2:])
+ if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
+ return env, msg
+ else:
+ tmp = None
+ if "reply" in env:
+ tmp = env["reply"]
+ self._queue.append((env,msg))
+ return self.recvmsg(nonblock, seq)
else:
- self._queue.append((env,msg))
- return self.recvmsg(nonblock, seq)
- else:
- return isc.cc.message.from_wire(data[2:header_length+2]), None
- return None, None
+ return isc.cc.message.from_wire(data[2:header_length+2]), None
+ return None, None
def _receive_full_buffer(self, nonblock):
if nonblock:
@@ -126,7 +130,6 @@
return None
if data == "": # server closed connection
raise ProtocolError("Read of 0 bytes: connection closed")
-
self._recvbuffer += data
if len(self._recvbuffer) < 4:
return None
More information about the bind10-changes
mailing list