[svn] commit: r332 - in /branches/parkinglot/src: bin/pymsgq/msgq.py lib/cc/cpp/session.cc lib/cc/cpp/session.h lib/cc/python/ISC/CC/session.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu Dec 3 02:37:41 UTC 2009
Author: mgraff
Date: Thu Dec 3 02:37:40 2009
New Revision: 332
Log:
New wire format, which makes things more sane for processing envelope apart from messages. No API changes. The current msgq does not support this, but the pymsgq I'm hoping to finish up tomorrow will.
Modified:
branches/parkinglot/src/bin/pymsgq/msgq.py
branches/parkinglot/src/lib/cc/cpp/session.cc
branches/parkinglot/src/lib/cc/cpp/session.h
branches/parkinglot/src/lib/cc/python/ISC/CC/session.py
Modified: branches/parkinglot/src/bin/pymsgq/msgq.py
==============================================================================
--- branches/parkinglot/src/bin/pymsgq/msgq.py (original)
+++ branches/parkinglot/src/bin/pymsgq/msgq.py Thu Dec 3 02:37:40 2009
@@ -10,7 +10,7 @@
import os
import socket
import sys
-import re
+import struct
import errno
import time
import select
@@ -18,6 +18,8 @@
from optparse import OptionParser, OptionValueError
import ISC.CC
+
+class MsgQReceiveError(Exception): pass
# This is the version that gets displayed to the user.
__version__ = "v20091030 (Paving the DNS Parking Lot)"
@@ -63,12 +65,14 @@
self.runnable = True
def process_accept(self):
+ """Process an accept on the listening socket."""
newsocket, ipaddr = self.listen_socket.accept()
sys.stderr.write("Connection\n")
self.sockets[newsocket.fileno()] = newsocket
self.poller.register(newsocket, select.POLLIN)
def process_socket(self, fd):
+ """Process a read on a socket."""
sock = self.sockets[fd]
if sock == None:
sys.stderr.write("Got read on Strange Socket fd %d\n" % fd)
@@ -76,19 +80,98 @@
sys.stderr.write("Got read on fd %d\n" %fd)
self.process_packet(fd, sock)
+ def kill_socket(self, fd, sock):
+ """Fully close down the socket."""
+ self.poller.unregister(sock)
+ sock.close()
+ self.sockets[fd] = None
+ sys.stderr.write("Closing socket fd %d\n" % fd)
+
+ def getbytes(self, fd, sock, length):
+ """Get exactly the requested bytes, or raise an exception if
+ EOF."""
+ received = b''
+ while len(received) < length:
+ data = sock.recv(length - len(received))
+ if len(data) == 0:
+ raise MsgQReceiveError("EOF")
+ received += data
+ return received
+
+ def read_packet(self, fd, sock):
+ """Read a correctly formatted packet. Will raise exceptions if
+ something fails."""
+ lengths = self.getbytes(fd, sock, 6)
+ overall_length, routing_length = struct.unpack(">IH", lengths)
+ if overall_length < 2:
+ raise MsgQReceiveError("overall_length < 2")
+ overall_length -= 2
+ sys.stderr.write("overall length: %d, routing_length %d\n"
+ % (overall_length, routing_length))
+ if routing_length > overall_length:
+ raise MsgQReceiveError("routing_length > overall_length")
+ if routing_length == 0:
+ raise MsgQReceiveError("routing_length == 0")
+ data_length = overall_length - routing_length
+ # probably need to sanity check lengths here...
+ routing = self.getbytes(fd, sock, routing_length)
+ if data_length > 0:
+ data = self.getbytes(fd, sock, data_length)
+ else:
+ data = None
+ return (routing, data)
+
def process_packet(self, fd, sock):
- data = sock.recv(4)
- if len(data) == 0:
- self.poller.unregister(sock)
- sock.close()
- self.sockets[fd] = None
- sys.stderr.write("Closing socket fd %d\n" % fd)
+ """Process one packet."""
+ try:
+ routing, data = self.read_packet(fd, sock)
+ except MsgQReceiveError as err:
+ self.kill_socket(fd, sock)
+ sys.stderr.write("Receive error: %s\n" % err)
return
- sys.stderr.write("Got data: %s\n" % data)
+
+ try:
+ routingmsg = ISC.CC.Message.from_wire(routing)
+ except DecodeError as err:
+ self.kill_socket(fd, sock)
+ sys.stderr.write("Routing decode error: %s\n" % err)
+ return
+
+ sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
+ sys.stdout.write("\t" + pprint.pformat(data) + "\n")
+
+ self.process_command(fd, sock, routingmsg, data)
+
+ def process_command(self, fd, sock, routing, data):
+ """Process a single command. This will split out into one of the
+ other functions, above."""
+ cmd = routing["type"]
+ if cmd == 'getlname':
+ self.process_command_getlname(sock, routing, data)
+ elif cmd == 'send':
+ self.process_command_send(sock, routing, data)
+ else:
+ sys.stderr.write("Invalid command: %s\n" % cmd)
+
+ def sendmsg(self, sock, env, msg = None):
+ if type(env) == dict:
+ env = ISC.CC.Message.to_wire(env)
+ if type(msg) == dict:
+ msg = ISC.CC.Message.to_wire(msg)
+ sock.setblocking(1)
+ length = 2 + len(env);
+ if msg:
+ length += len(msg)
+ sock.send(struct.pack("!IH", length, len(env)))
+ sock.send(env)
+ if msg:
+ sock.send(msg)
+
+ def process_command_getlname(self, sock, routing, data):
+ self.sendmsg(sock, { "type" : "getlname" }, { "lname" : "staticlname" })
def run(self):
"""Process messages. Forever. Mostly."""
-
while True:
try:
events = self.poller.poll()
Modified: branches/parkinglot/src/lib/cc/cpp/session.cc
==============================================================================
--- branches/parkinglot/src/lib/cc/cpp/session.cc (original)
+++ branches/parkinglot/src/lib/cc/cpp/session.cc Thu Dec 3 02:37:40 2009
@@ -73,13 +73,45 @@
std::string wire = msg->to_wire();
unsigned int length = wire.length();
unsigned int length_net = htonl(length);
+ unsigned short header_length_net = htons(length);
unsigned int ret;
ret = write(sock, &length_net, 4);
if (ret != 4)
throw SessionError("Short write");
+ ret = write(sock, &header_length_net, 2);
+ if (ret != 2)
+ throw SessionError("Short write");
+
ret = write(sock, wire.c_str(), length);
+ if (ret != length)
+ throw SessionError("Short write");
+}
+
+void
+Session::sendmsg(ElementPtr& env, ElementPtr& msg)
+{
+ std::string header_wire = env->to_wire();
+ std::string body_wire = msg->to_wire();
+ unsigned int length = 2 + header_wire.length() + body_wire.length();
+ unsigned int length_net = htonl(length);
+ unsigned short header_length = header_wire.length();
+ unsigned short header_length_net = htons(header_length);
+ unsigned int ret;
+
+ ret = write(sock, &length_net, 4);
+ if (ret != 4)
+ throw SessionError("Short write");
+
+ ret = write(sock, &header_length_net, 2);
+ if (ret != 2)
+ throw SessionError("Short write");
+
+ std::cout << "[XX] Header length sending: " << header_length << std::endl;
+
+ ret = write(sock, header_wire.c_str(), header_length);
+ ret = write(sock, body_wire.c_str(), body_wire.length());
if (ret != length)
throw SessionError("Short write");
}
@@ -88,13 +120,23 @@
Session::recvmsg(ElementPtr& msg, bool nonblock)
{
unsigned int length_net;
+ unsigned short header_length_net;
unsigned int ret;
ret = read(sock, &length_net, 4);
if (ret != 4)
throw SessionError("Short read");
- unsigned int length = ntohl(length_net);
+ ret = read(sock, &header_length_net, 2);
+ if (ret != 2)
+ throw SessionError("Short read");
+
+ unsigned int length = ntohl(length_net) - 2;
+ unsigned short header_length = ntohs(header_length_net);
+ if (header_length != length) {
+ throw SessionError("Received non-empty body where only a header expected");
+ }
+
char *buffer = new char[length];
ret = read(sock, buffer, length);
if (ret != length)
@@ -107,6 +149,48 @@
wire_stream <<wire;
msg = Element::from_wire(wire_stream, length);
+
+ return (true);
+ // XXXMLG handle non-block here, and return false for short reads
+}
+
+bool
+Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock)
+{
+ unsigned int length_net;
+ unsigned short header_length_net;
+ unsigned int ret;
+
+ ret = read(sock, &length_net, 4);
+ if (ret != 4)
+ throw SessionError("Short read");
+
+ ret = read(sock, &header_length_net, 2);
+ if (ret != 2)
+ throw SessionError("Short read");
+
+ unsigned int length = ntohl(length_net);
+ unsigned short header_length = ntohs(header_length_net);
+
+ if (header_length > length)
+ throw SessionError("Bad header length");
+
+ char *buffer = new char[length];
+ ret = read(sock, buffer, length);
+ if (ret != length)
+ throw SessionError("Short read");
+
+ std::string header_wire = std::string(buffer, header_length);
+ std::string body_wire = std::string(buffer, length - header_length);
+ delete [] buffer;
+
+ std::stringstream header_wire_stream;
+ header_wire_stream << header_wire;
+ env = Element::from_wire(header_wire_stream, length);
+
+ std::stringstream body_wire_stream;
+ body_wire_stream << body_wire;
+ msg = Element::from_wire(body_wire_stream, length - header_length);
return (true);
// XXXMLG handle non-block here, and return false for short reads
@@ -148,9 +232,9 @@
env->set("group", Element::create(group));
env->set("instance", Element::create(instance));
env->set("seq", Element::create(sequence));
- env->set("msg", Element::create(msg->to_wire()));
-
- sendmsg(env);
+ //env->set("msg", Element::create(msg->to_wire()));
+
+ sendmsg(env, msg);
return (sequence++);
}
@@ -158,13 +242,10 @@
bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, bool nonblock)
{
- bool got_message = recvmsg(envelope, nonblock);
+ bool got_message = recvmsg(envelope, msg, nonblock);
if (!got_message) {
return false;
}
-
- msg = Element::from_wire(envelope->get("msg")->string_value());
- envelope->remove("msg");
return (true);
}
@@ -180,10 +261,9 @@
env->set("group", Element::create(envelope->get("group")->string_value()));
env->set("instance", Element::create(envelope->get("instance")->string_value()));
env->set("seq", Element::create(sequence));
- env->set("msg", Element::create(newmsg->to_wire()));
env->set("reply", Element::create(envelope->get("seq")->string_value()));
- sendmsg(env);
+ sendmsg(env, newmsg);
return (sequence++);
}
Modified: branches/parkinglot/src/lib/cc/cpp/session.h
==============================================================================
--- branches/parkinglot/src/lib/cc/cpp/session.h (original)
+++ branches/parkinglot/src/lib/cc/cpp/session.h Thu Dec 3 02:37:40 2009
@@ -36,7 +36,11 @@
void establish();
void disconnect();
void sendmsg(ISC::Data::ElementPtr& msg);
+ void sendmsg(ISC::Data::ElementPtr& env, ISC::Data::ElementPtr& msg);
bool recvmsg(ISC::Data::ElementPtr& msg,
+ bool nonblock = true);
+ bool recvmsg(ISC::Data::ElementPtr& env,
+ ISC::Data::ElementPtr& msg,
bool nonblock = true);
void subscribe(std::string group,
std::string instance = "*",
Modified: branches/parkinglot/src/lib/cc/python/ISC/CC/session.py
==============================================================================
--- branches/parkinglot/src/lib/cc/python/ISC/CC/session.py (original)
+++ branches/parkinglot/src/lib/cc/python/ISC/CC/session.py Thu Dec 3 02:37:40 2009
@@ -37,7 +37,7 @@
self._socket.connect(tuple(['127.0.0.1', port]))
self.sendmsg({ "type": "getlname" })
- msg = self.recvmsg(False)
+ env, msg = self.recvmsg(False)
self._lname = msg["lname"]
if not self._lname:
raise ProtocolError("Could not get local name")
@@ -48,18 +48,31 @@
def lname(self):
return self._lname
- def sendmsg(self, msg):
+ def sendmsg(self, env, msg = None):
+ if type(env) == dict:
+ env = Message.to_wire(env)
if type(msg) == dict:
msg = Message.to_wire(msg)
self._socket.setblocking(1)
- self._socket.send(struct.pack("!I", len(msg)))
- self._socket.send(msg)
+ length = 2 + len(env);
+ if msg:
+ length += len(msg)
+ self._socket.send(struct.pack("!I", length))
+ self._socket.send(struct.pack("!H", len(env)))
+ self._socket.send(env)
+ if msg:
+ self._socket.send(msg)
def recvmsg(self, nonblock = True):
data = self._receive_full_buffer(nonblock)
- if data:
- return Message.from_wire(data)
- return None
+ if data and len(data) > 2:
+ header_length = struct.unpack('>H', data[0:2])[0]
+ data_length = len(data) - 2 - header_length
+ if data_length > 0:
+ return Message.from_wire(data[2:header_length+2]), Message.from_wire(data[header_length + 2:])
+ else:
+ return Message.from_wire(data[2:header_length+2]), None
+ return None, None
def _receive_full_buffer(self, nonblock):
if nonblock:
@@ -127,20 +140,15 @@
"group": group,
"instance": instance,
"seq": seq,
- "msg": Message.to_wire(msg),
- })
+ }, Message.to_wire(msg))
return seq
def group_recvmsg(self, nonblock = True):
- env = self.recvmsg(nonblock)
+ env, msg = self.recvmsg(nonblock)
if env == None:
# return none twice to match normal return value
# (so caller won't get a type error on no data)
return (None, None)
- if type(env["msg"]) != bytearray:
- msg = Message.from_wire(env["msg"].encode('ascii'))
- else:
- msg = Message.from_wire(env["msg"])
return (msg, env)
def group_reply(self, routing, msg):
@@ -153,8 +161,7 @@
"instance": routing["instance"],
"seq": seq,
"reply": routing["seq"],
- "msg": Message.to_wire(msg),
- })
+ }, Message.to_wire(msg))
return seq
if __name__ == "__main__":
More information about the bind10-changes
mailing list