[svn] commit: r333 - in /branches/parkinglot/src: bin/pymsgq/msgq.py bin/pymsgq/msgq_test.py lib/cc/python/test_session.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu Dec 3 06:42:40 UTC 2009
Author: mgraff
Date: Thu Dec 3 06:42:40 2009
New Revision: 333
Log:
checkpoint work; Python-based msgq mostly works. Bad input will crash it, which should be fixed, probably by wrapping the entire message processing in a try loop. Gross, but...
Added:
branches/parkinglot/src/bin/pymsgq/msgq_test.py
branches/parkinglot/src/lib/cc/python/test_session.py
Modified:
branches/parkinglot/src/bin/pymsgq/msgq.py
Modified: branches/parkinglot/src/bin/pymsgq/msgq.py
==============================================================================
--- branches/parkinglot/src/bin/pymsgq/msgq.py (original)
+++ branches/parkinglot/src/bin/pymsgq/msgq.py Thu Dec 3 06:42:40 2009
@@ -15,14 +15,59 @@
import time
import select
import pprint
+import random
from optparse import OptionParser, OptionValueError
import ISC.CC
-
-class MsgQReceiveError(Exception): pass
# This is the version that gets displayed to the user.
__version__ = "v20091030 (Paving the DNS Parking Lot)"
+
+class MsgQReceiveError(Exception): pass
+
+class SubscriptionManager:
+ def __init__(self):
+ self.subscriptions = {}
+
+ def subscribe(self, group, instance, socket):
+ """Add a subscription."""
+ target = ( group, instance )
+ if target in self.subscriptions:
+ print("Appending to existing target")
+ self.subscriptions[target].append(socket)
+ else:
+ print("Creating new target")
+ self.subscriptions[target] = [ socket ]
+
+ def unsubscribe(self, group, instance, socket):
+ """Remove the socket from the one specific subscription."""
+ target = ( group, instance )
+ if target in self.subscriptions:
+ while socket in self.subscriptions[target]:
+ self.subscriptions[target].remove(socket)
+
+ def unsubscribe_all(self, socket):
+ """Remove the socket from all subscriptions."""
+ for socklist in self.subscriptions.values():
+ while socket in socklist:
+ socklist.remove(socket)
+
+ def find_sub(self, group, instance):
+ """Return an array of sockets which want this specific group,
+ instance."""
+ target = (group, instance)
+ if target in self.subscriptions:
+ return self.subscriptions[target]
+ else:
+ return []
+
+ def find(self, group, instance):
+ """Return an array of sockets who should get something sent to
+ this group, instance pair. This includes wildcard subscriptions."""
+ target = (group, instance)
+ partone = self.find_sub(group, instance)
+ parttwo = self.find_sub(group, "*")
+ return list(set(partone + parttwo))
class MsgQ:
"""Message Queue class."""
@@ -39,6 +84,9 @@
self.runnable = False
self.listen_socket = False
self.sockets = {}
+ self.connection_counter = random.random()
+ self.hostname = socket.gethostname()
+ self.subs = SubscriptionManager()
def setup_poller(self):
"""Set up the poll thing. Internal function."""
@@ -77,12 +125,13 @@
if sock == None:
sys.stderr.write("Got read on Strange Socket fd %d\n" % fd)
return
- sys.stderr.write("Got read on fd %d\n" %fd)
+# sys.stderr.write("Got read on fd %d\n" %fd)
self.process_packet(fd, sock)
def kill_socket(self, fd, sock):
"""Fully close down the socket."""
self.poller.unregister(sock)
+ self.subs.unsubscribe_all(sock)
sock.close()
self.sockets[fd] = None
sys.stderr.write("Closing socket fd %d\n" % fd)
@@ -106,8 +155,6 @@
if overall_length < 2:
raise MsgQReceiveError("overall_length < 2")
overall_length -= 2
- sys.stderr.write("overall length: %d, routing_length %d\n"
- % (overall_length, routing_length))
if routing_length > overall_length:
raise MsgQReceiveError("routing_length > overall_length")
if routing_length == 0:
@@ -137,8 +184,8 @@
sys.stderr.write("Routing decode error: %s\n" % err)
return
- sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
- sys.stdout.write("\t" + pprint.pformat(data) + "\n")
+# sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
+# sys.stdout.write("\t" + pprint.pformat(data) + "\n")
self.process_command(fd, sock, routingmsg, data)
@@ -146,29 +193,77 @@
"""Process a single command. This will split out into one of the
other functions, above."""
cmd = routing["type"]
- if cmd == 'getlname':
+ if cmd == 'send':
+ self.process_command_send(sock, routing, data)
+ elif cmd == 'subscribe':
+ self.process_command_subscribe(sock, routing, data)
+ elif cmd == 'unsubscribe':
+ self.process_command_unsubscribe(sock, routing, data)
+ elif cmd == 'getlname':
self.process_command_getlname(sock, routing, data)
- elif cmd == 'send':
- self.process_command_send(sock, routing, data)
else:
sys.stderr.write("Invalid command: %s\n" % cmd)
- def sendmsg(self, sock, env, msg = None):
+ def preparemsg(self, env, msg = None):
if type(env) == dict:
env = ISC.CC.Message.to_wire(env)
if type(msg) == dict:
msg = ISC.CC.Message.to_wire(msg)
- sock.setblocking(1)
length = 2 + len(env);
if msg:
length += len(msg)
- sock.send(struct.pack("!IH", length, len(env)))
- sock.send(env)
+ ret = struct.pack("!IH", length, len(env))
+ ret += env
if msg:
- sock.send(msg)
+ ret += msg
+ return ret
+
+ def sendmsg(self, sock, env, msg = None):
+ sock.send(self.preparemsg(env, msg))
+
+ def send_prepared_msg(self, sock, msg):
+ sock.send(msg)
+
+ def newlname(self):
+ """Generate a unique conenction identifier for this socket.
+ This is done by using an increasing counter and the current
+ time."""
+ self.connection_counter += 1
+ return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
def process_command_getlname(self, sock, routing, data):
- self.sendmsg(sock, { "type" : "getlname" }, { "lname" : "staticlname" })
+ env = { "type" : "getlname" }
+ reply = { "lname" : self.newlname() }
+ self.sendmsg(sock, env, reply)
+
+ def process_command_send(self, sock, routing, data):
+ group = routing["group"]
+ instance = routing["instance"]
+ if group == None or instance == None:
+ return # ignore invalid packets entirely
+ sockets = self.subs.find(group, instance)
+
+ msg = self.preparemsg(routing, data)
+
+ if sock in sockets:
+ sockets.remove(sock)
+ for socket in sockets:
+ self.send_prepared_msg(socket, msg)
+
+ def process_command_subscribe(self, sock, routing, data):
+ group = routing["group"]
+ instance = routing["instance"]
+ subtype = routing["subtype"]
+ if group == None or instance == None or subtype == None:
+ return # ignore invalid packets entirely
+ self.subs.subscribe(group, instance, sock)
+
+ def process_command_unsubscribe(self, sock, routing, data):
+ group = routing["group"]
+ instance = routing["instance"]
+ if group == None or instance == None:
+ return # ignore invalid packets entirely
+ self.subs.unsubscribe(group, instance, sock)
def run(self):
"""Process messages. Forever. Mostly."""
More information about the bind10-changes
mailing list