[svn] commit: r1663 - in /branches/trac58/src/lib: cc/session.cc python/isc/cc/session.py

BIND 10 source code commits bind10-changes at lists.isc.org
Wed Mar 31 14:51:19 UTC 2010


Author: jelte
Date: Wed Mar 31 14:51:18 2010
New Revision: 1663

Log:
b10-cmdctl sometimes tries to read a command from the command channel twice at the same time (one when excpecting an answer to a command, and one listening for commands in general)

Added two things:
the session cc/session.py now has a lock in recvmsg() and sendmsg(), so that it does not try to read a message while it is still reading one

recvmsg() with no sequence id specified does not return a message that actually is a reply to a command, so that the general command reader does not eat away the reply expected in send/recv pairs

We might need to rethink the latter; this means that you will never get a message sent with group_reply unless you explicitely specify you want that reply. The other option is to put way more severe restrictions on how to read messages from the channel.

Modified:
    branches/trac58/src/lib/cc/session.cc
    branches/trac58/src/lib/python/isc/cc/session.py

Modified: branches/trac58/src/lib/cc/session.cc
==============================================================================
--- branches/trac58/src/lib/cc/session.cc (original)
+++ branches/trac58/src/lib/cc/session.cc Wed Mar 31 14:51:18 2010
@@ -364,23 +364,20 @@
     size_t length = impl_->readDataLength();
     ElementPtr l_env, l_msg;
     if (hasQueuedMsgs()) {
-        if (seq == -1) {
-            env = impl_->queue_->get(0)->get(0);
-            msg = impl_->queue_->get(0)->get(1);
-            impl_->queue_->remove(0);
-            return true;
-        } else {
-            ElementPtr q_el;
-            for (int i = 0; i < impl_->queue_->size(); i++) {
-                q_el = impl_->queue_->get(i);
-                if (q_el->get(0)->contains("reply") &&
-                    q_el->get(0)->get("reply")->intValue() == seq
-                   ) {
-                       env = q_el->get(0);
-                       msg = q_el->get(1);
-                       impl_->queue_->remove(i);
-                       return true;
-                }
+        ElementPtr q_el;
+        for (int i = 0; i < impl_->queue_->size(); i++) {
+            q_el = impl_->queue_->get(i);
+            if (( seq == -1 &&
+                  !q_el->get(0)->contains("reply")
+                ) || (
+                  q_el->get(0)->contains("reply") &&
+                  q_el->get(0)->get("reply")->intValue() == seq
+                )
+               ) {
+                   env = q_el->get(0);
+                   msg = q_el->get(1);
+                   impl_->queue_->remove(i);
+                   return true;
             }
         }
     }
@@ -409,8 +406,10 @@
     std::stringstream body_wire_stream;
     body_wire_stream << body_wire;
     l_msg = Element::fromWire(body_wire_stream, length - header_length);
-    if (seq == -1 ||
-        (l_env->contains("reply") &&
+    if ((seq == -1 &&
+         !l_env->contains("reply")
+        ) || (
+         l_env->contains("reply") &&
          l_env->get("reply")->intValue() == seq
         )
        ) {

Modified: branches/trac58/src/lib/python/isc/cc/session.py
==============================================================================
--- branches/trac58/src/lib/python/isc/cc/session.py (original)
+++ branches/trac58/src/lib/python/isc/cc/session.py Wed Mar 31 14:51:18 2010
@@ -17,6 +17,7 @@
 import socket
 import struct
 import os
+import threading
 
 import isc.cc.message
 
@@ -33,6 +34,7 @@
         self._sequence = 1
         self._closed = False
         self._queue = []
+        self._lock = threading.RLock()
 
         if port == 0:
 	        if 'ISC_MSGQ_PORT' in os.environ:
@@ -64,52 +66,54 @@
         self._closed = True
 
     def sendmsg(self, env, msg = None):
-        XXmsg = msg
-        XXenv = env
-        if self._closed:
-            raise SessionError("Session has been closed.")
-        if type(env) == dict:
-            env = isc.cc.message.to_wire(env)
-        if type(msg) == dict:
-            msg = isc.cc.message.to_wire(msg)
-        self._socket.setblocking(1)
-        length = 2 + len(env);
-        if msg:
-            length += len(msg)
-        self._socket.send(struct.pack("!I", length))
-        self._socket.send(struct.pack("!H", len(env)))
-        self._socket.send(env)
-        if msg:
-            self._socket.send(msg)
+        with self._lock:
+            if self._closed:
+                raise SessionError("Session has been closed.")
+            if type(env) == dict:
+                env = isc.cc.message.to_wire(env)
+            if type(msg) == dict:
+                msg = isc.cc.message.to_wire(msg)
+            self._socket.setblocking(1)
+            length = 2 + len(env);
+            if msg:
+                length += len(msg)
+            self._socket.send(struct.pack("!I", length))
+            self._socket.send(struct.pack("!H", len(env)))
+            self._socket.send(env)
+            if msg:
+                self._socket.send(msg)
 
     def recvmsg(self, nonblock = True, seq = None):
-        if len(self._queue) > 0:
-            if seq == None:
-                return self._queue.pop(0)
-            else:
+        with self._lock:
+            if len(self._queue) > 0:
                 i = 0;
                 for env, msg in self._queue:
-                    if "reply" in env and seq == env["reply"]:
+                    if seq != None and "reply" in env and seq == env["reply"]:
+                        return self._queue.pop(i)
+                    elif seq == None and "reply" not in env:
                         return self._queue.pop(i)
                     else:
                         i = i + 1
-        if self._closed:
-            raise SessionError("Session has been closed.")
-        data = self._receive_full_buffer(nonblock)
-        if data and len(data) > 2:
-            header_length = struct.unpack('>H', data[0:2])[0]
-            data_length = len(data) - 2 - header_length
-            if data_length > 0:
-                env = isc.cc.message.from_wire(data[2:header_length+2])
-                msg = isc.cc.message.from_wire(data[header_length + 2:])
-                if seq == None or "reply" in env and seq == env["reply"]:
-                    return env, msg
+            if self._closed:
+                raise SessionError("Session has been closed.")
+            data = self._receive_full_buffer(nonblock)
+            if data and len(data) > 2:
+                header_length = struct.unpack('>H', data[0:2])[0]
+                data_length = len(data) - 2 - header_length
+                if data_length > 0:
+                    env = isc.cc.message.from_wire(data[2:header_length+2])
+                    msg = isc.cc.message.from_wire(data[header_length + 2:])
+                    if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
+                        return env, msg
+                    else:
+                        tmp = None
+                        if "reply" in env:
+                            tmp = env["reply"]
+                        self._queue.append((env,msg))
+                        return self.recvmsg(nonblock, seq)
                 else:
-                    self._queue.append((env,msg))
-                    return self.recvmsg(nonblock, seq)
-            else:
-                return isc.cc.message.from_wire(data[2:header_length+2]), None
-        return None, None
+                    return isc.cc.message.from_wire(data[2:header_length+2]), None
+            return None, None
 
     def _receive_full_buffer(self, nonblock):
         if nonblock:
@@ -126,7 +130,6 @@
                 return None
             if data == "": # server closed connection
                 raise ProtocolError("Read of 0 bytes: connection closed")
-
             self._recvbuffer += data
             if len(self._recvbuffer) < 4:
                 return None




More information about the bind10-changes mailing list