[svn] commit: r39 - in /experiments/graff-ccapi/ruby/lib: cc.rb cc/session.rb
BIND 10 source code commits
bind10-changes at lists.isc.org
Sat Sep 26 05:23:00 UTC 2009
Author: mgraff
Date: Sat Sep 26 05:23:00 2009
New Revision: 39
Log:
add group functions; can do about 500 messages/sec without any optimizations. Should be good enough for now.
Modified:
experiments/graff-ccapi/ruby/lib/cc.rb
experiments/graff-ccapi/ruby/lib/cc/session.rb
Modified: experiments/graff-ccapi/ruby/lib/cc.rb
==============================================================================
--- experiments/graff-ccapi/ruby/lib/cc.rb (original)
+++ experiments/graff-ccapi/ruby/lib/cc.rb Sat Sep 26 05:23:00 2009
@@ -48,18 +48,13 @@
puts "Our local name: #{cc.lname}"
- cc.sendmsg({ :type => "subscribe",
- :group => "test", :instance => "foo",
- :subtype => "normal",
- })
- cc.sendmsg({ :type => "send",
- :from => cc.lname,
- :to => "*",
- :group => "test",
- :instance => "foo",
- :seq => "1",
- :msg => CC::Message.to_wire({ :one => :two }),
- })
- puts cc.recvmsg.inspect
- sleep 10
+ cc.group_subscribe("test")
+
+ counter = 0
+
+ while counter < 10000 do
+ cc.group_sendmsg({ :counter => counter }, "test", "foo")
+ routing, data = cc.group_recvmsg(false)
+ counter += 1
+ end
end
Modified: experiments/graff-ccapi/ruby/lib/cc/session.rb
==============================================================================
--- experiments/graff-ccapi/ruby/lib/cc/session.rb (original)
+++ experiments/graff-ccapi/ruby/lib/cc/session.rb Sat Sep 26 05:23:00 2009
@@ -34,6 +34,7 @@
@recvbuffer = "" # data buffer for partial reads.
@recvlength = nil # if non-nil, we have a length to fill buffer to.
@sendbuffer = "" # pending output data.
+ @sequence = "a" # per message sequence id, always unique
options = {
:host => "127.0.0.1",
@@ -46,7 +47,7 @@
# Get our local name.
#
sendmsg({ :type => :getlname })
- msg = recvmsg
+ msg = recvmsg(false)
@lname = msg["lname"]
if @lname.nil?
raise CC::ProtocolError, "Could not get local name"
@@ -84,15 +85,13 @@
# Send as much data as we can.
def send_pending
return false if @sendbuffer.length == 0
- puts "Sending #{@sendbuffer.length} bytes"
sent = @socket.send(@sendbuffer, 0)
@sendbuffer = @sendbuffer[sent .. -1]
- puts "Send left #{@sendbuffer.length} bytes"
@sendbuffer.length == 0 ? true : false
end
- def recvmsg
- data = receive_full_buffer
+ def recvmsg(nonblock = true)
+ data = receive_full_buffer(nonblock)
if data
CC::Message::from_wire(data)
else
@@ -100,21 +99,67 @@
end
end
+ def group_subscribe(group, instance = "*", subtype = "normal")
+ sendmsg({ :type => "subscribe",
+ :group => group,
+ :instance => instance,
+ :subtype => subtype,
+ })
+ end
+
+ def group_unsubscribe(group, instance = "*")
+ sendmsg({ :type => "unsubscribe",
+ :group => group,
+ :instance => instance,
+ })
+ end
+
+ def group_sendmsg(msg, group, instance = "*", to = "*")
+ sendmsg({ :type => "send",
+ :from => @lname,
+ :to => to,
+ :group => group,
+ :instance => instance,
+ :seq => next_sequence,
+ :msg => CC::Message.to_wire(msg),
+ })
+ end
+
+ def group_recvmsg(nonblock = true)
+ msg = recvmsg(nonblock)
+ return nil if msg.nil?
+ data = CC::Message::from_wire(msg["msg"])
+ msg.delete("msg")
+ return [data, msg]
+ end
+
private
+
+ def next_sequence
+ @sequence.next!
+ end
#
# A rather tricky function. If we have something waiting in our buffer,
# and it will satisfy the current request, we will read it all in. If
# not, we will read only what we need to satisfy a single message.
#
- def receive_full_buffer
+ def receive_full_buffer(nonblock)
# read the length prefix if we need it still.
if @recvlength.nil?
- puts "receiving length bytes"
length = 4
length -= @recvbuffer.length
- data = @socket.recv(length)
- return nil if data == 0
+ data = nil
+ begin
+ if nonblock
+ data = @socket.recv_nonblock(length)
+ else
+ data = @socket.recv(length)
+ end
+ rescue Errno::EINPROGRESS
+ rescue Errno::EAGAIN
+ end
+ return nil if data == nil
@recvbuffer += data
return nil if @recvbuffer.length < 4
@recvlength = @recvbuffer.unpack('N')[0]
@@ -128,8 +173,16 @@
#
length = @recvlength - @recvbuffer.length
while (length > 0) do
- puts "receiving #{length} of #{@recvlength} bytes"
- data = @socket.recv(length)
+ data = nil
+ begin
+ if nonblock
+ data = @socket.recv_nonblock(length)
+ else
+ data = @socket.recv(length)
+ end
+ rescue Errno::EINPROGRESS
+ rescue Errno::EAGAIN
+ end
return nil if data == 0 # blocking I/O
@recvbuffer += data
length -= data.length
More information about the bind10-changes
mailing list