[svn] commit: r39 - in /experiments/graff-ccapi/ruby/lib: cc.rb cc/session.rb

BIND 10 source code commits bind10-changes at lists.isc.org
Sat Sep 26 05:23:00 UTC 2009


Author: mgraff
Date: Sat Sep 26 05:23:00 2009
New Revision: 39

Log:
add group functions; can do about 500 messages/sec without any optimizations.  Should be good enough for now.

Modified:
    experiments/graff-ccapi/ruby/lib/cc.rb
    experiments/graff-ccapi/ruby/lib/cc/session.rb

Modified: experiments/graff-ccapi/ruby/lib/cc.rb
==============================================================================
--- experiments/graff-ccapi/ruby/lib/cc.rb (original)
+++ experiments/graff-ccapi/ruby/lib/cc.rb Sat Sep 26 05:23:00 2009
@@ -48,18 +48,13 @@
 
   puts "Our local name: #{cc.lname}"
 
-  cc.sendmsg({ :type => "subscribe",
-               :group => "test", :instance => "foo",
-               :subtype => "normal",
-             })
-  cc.sendmsg({ :type => "send",
-               :from => cc.lname,
-               :to => "*",
-               :group => "test",
-               :instance => "foo",
-               :seq => "1",
-               :msg => CC::Message.to_wire({ :one => :two }),
-             })
-  puts cc.recvmsg.inspect
-  sleep 10
+  cc.group_subscribe("test")
+
+  counter = 0
+
+  while counter < 10000 do
+    cc.group_sendmsg({ :counter => counter }, "test", "foo")
+    routing, data = cc.group_recvmsg(false)
+    counter += 1
+  end
 end

Modified: experiments/graff-ccapi/ruby/lib/cc/session.rb
==============================================================================
--- experiments/graff-ccapi/ruby/lib/cc/session.rb (original)
+++ experiments/graff-ccapi/ruby/lib/cc/session.rb Sat Sep 26 05:23:00 2009
@@ -34,6 +34,7 @@
     @recvbuffer = ""   # data buffer for partial reads.
     @recvlength = nil  # if non-nil, we have a length to fill buffer to.
     @sendbuffer = ""   # pending output data.
+    @sequence = "a"    # per message sequence id, always unique
 
     options = {
       :host => "127.0.0.1",
@@ -46,7 +47,7 @@
     # Get our local name.
     #
     sendmsg({ :type => :getlname })
-    msg = recvmsg
+    msg = recvmsg(false)
     @lname = msg["lname"]
     if @lname.nil?
       raise CC::ProtocolError, "Could not get local name"
@@ -84,15 +85,13 @@
   # Send as much data as we can.  
   def send_pending
     return false if @sendbuffer.length == 0
-    puts "Sending #{@sendbuffer.length} bytes"
     sent = @socket.send(@sendbuffer, 0)
     @sendbuffer = @sendbuffer[sent .. -1]
-    puts "Send left #{@sendbuffer.length} bytes"
     @sendbuffer.length == 0 ? true : false
   end
 
-  def recvmsg
-    data = receive_full_buffer
+  def recvmsg(nonblock = true)
+    data = receive_full_buffer(nonblock)
     if data
       CC::Message::from_wire(data)
     else
@@ -100,21 +99,67 @@
     end
   end
 
+  def group_subscribe(group, instance = "*", subtype = "normal")
+    sendmsg({ :type => "subscribe",
+              :group => group,
+              :instance => instance,
+              :subtype => subtype,
+            })
+  end
+
+  def group_unsubscribe(group, instance = "*")
+    sendmsg({ :type => "unsubscribe",
+              :group => group,
+              :instance => instance,
+            })
+  end
+
+  def group_sendmsg(msg, group, instance = "*", to = "*")
+    sendmsg({ :type => "send",
+              :from => @lname,
+              :to => to,
+              :group => group,
+              :instance => instance,
+              :seq => next_sequence,
+              :msg => CC::Message.to_wire(msg),
+            })
+  end
+
+  def group_recvmsg(nonblock = true)
+    msg = recvmsg(nonblock)
+    return nil if msg.nil?
+    data = CC::Message::from_wire(msg["msg"])
+    msg.delete("msg")
+    return [data, msg]
+  end
+
   private
+
+  def next_sequence
+    @sequence.next!
+  end
 
   #
   # A rather tricky function.  If we have something waiting in our buffer,
   # and it will satisfy the current request, we will read it all in.  If
   # not, we will read only what we need to satisfy a single message.
   #
-  def receive_full_buffer
+  def receive_full_buffer(nonblock)
     # read the length prefix if we need it still.
     if @recvlength.nil?
-      puts "receiving length bytes"
       length = 4
       length -= @recvbuffer.length
-      data = @socket.recv(length)
-      return nil if data == 0
+      data = nil
+      begin
+        if nonblock
+          data = @socket.recv_nonblock(length)
+        else
+          data = @socket.recv(length)
+        end
+        rescue Errno::EINPROGRESS
+        rescue Errno::EAGAIN
+      end
+      return nil if data == nil
       @recvbuffer += data
       return nil if @recvbuffer.length < 4
       @recvlength = @recvbuffer.unpack('N')[0]
@@ -128,8 +173,16 @@
     #
     length = @recvlength - @recvbuffer.length
     while (length > 0) do
-      puts "receiving #{length} of #{@recvlength} bytes"
-      data = @socket.recv(length)
+      data = nil
+      begin
+        if nonblock
+          data = @socket.recv_nonblock(length)
+        else
+          data = @socket.recv(length)
+        end
+        rescue Errno::EINPROGRESS
+        rescue Errno::EAGAIN
+      end
       return nil if data == 0 # blocking I/O
       @recvbuffer += data
       length -= data.length




More information about the bind10-changes mailing list