[svn] commit: r2597 - in /experiments/stephen-receptionist: ./ client/ common/ common/test/ contractor/ intermediary/ receptionist/ scripts/ server/ worker/
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu Jul 22 17:30:36 UTC 2010
Author: stephen
Date: Thu Jul 22 17:30:35 2010
New Revision: 2597
Log:
Added code for asynchronous tests
Added:
experiments/stephen-receptionist/client/client_controller_asynchronous.cc
experiments/stephen-receptionist/client/client_controller_asynchronous.h
experiments/stephen-receptionist/client/client_controller_synchronous.cc
experiments/stephen-receptionist/client/client_controller_synchronous.h
experiments/stephen-receptionist/client/token_bucket.cc
experiments/stephen-receptionist/client/token_bucket.h
experiments/stephen-receptionist/common/burst_server_controller.cc
experiments/stephen-receptionist/common/burst_server_controller.h
experiments/stephen-receptionist/common/debug.cc
experiments/stephen-receptionist/common/debug.h
experiments/stephen-receptionist/common/debug_flags.h
experiments/stephen-receptionist/common/packet_counter.cc
experiments/stephen-receptionist/common/packet_counter.h
experiments/stephen-receptionist/common/test/
experiments/stephen-receptionist/common/test/Makefile.am
experiments/stephen-receptionist/common/test/test.cc
experiments/stephen-receptionist/common/test/test_utilities.cc
experiments/stephen-receptionist/common/udp_buffer.cc
experiments/stephen-receptionist/scripts/common.sh (with props)
experiments/stephen-receptionist/scripts/run_all.sh (with props)
Removed:
experiments/stephen-receptionist/common/test_some_utilities.cpp
Modified:
experiments/stephen-receptionist/Makefile.am
experiments/stephen-receptionist/client/Makefile.am
experiments/stephen-receptionist/client/client.cc
experiments/stephen-receptionist/client/client_command.cc
experiments/stephen-receptionist/client/client_command.h
experiments/stephen-receptionist/client/client_communicator.h
experiments/stephen-receptionist/client/client_controller.cc
experiments/stephen-receptionist/client/client_controller.h
experiments/stephen-receptionist/client/logger.cc
experiments/stephen-receptionist/client/logger.h
experiments/stephen-receptionist/common/Makefile.am
experiments/stephen-receptionist/common/controller.h
experiments/stephen-receptionist/common/defaults.h
experiments/stephen-receptionist/common/exception.h
experiments/stephen-receptionist/common/msgq_communicator.cc
experiments/stephen-receptionist/common/target_command.cc
experiments/stephen-receptionist/common/udp_buffer.h
experiments/stephen-receptionist/common/udp_communicator.cc
experiments/stephen-receptionist/common/utilities.cc
experiments/stephen-receptionist/common/utilities.h
experiments/stephen-receptionist/configure.ac
experiments/stephen-receptionist/contractor/Makefile.am
experiments/stephen-receptionist/contractor/contractor.cc
experiments/stephen-receptionist/contractor/contractor_controller.cc
experiments/stephen-receptionist/intermediary/intermediary_controller.cc
experiments/stephen-receptionist/receptionist/receptionist_controller.cc
experiments/stephen-receptionist/scripts/client-contractor.sh
experiments/stephen-receptionist/scripts/client-server.sh
experiments/stephen-receptionist/scripts/client-worker.sh
experiments/stephen-receptionist/server/Makefile.am
experiments/stephen-receptionist/server/server.cc
experiments/stephen-receptionist/server/server_controller.cc
experiments/stephen-receptionist/server/server_controller.h
experiments/stephen-receptionist/worker/Makefile.am
experiments/stephen-receptionist/worker/worker.cc
experiments/stephen-receptionist/worker/worker_controller.cc
Modified: experiments/stephen-receptionist/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/Makefile.am (original)
+++ experiments/stephen-receptionist/Makefile.am Thu Jul 22 17:30:35 2010
@@ -1,7 +1,11 @@
-SUBDIRS = common client contractor intermediary receptionist server worker
+SUBDIRS = common client contractor intermediary receptionist server worker common/test
+
+TESTS = common/test/test
dist_doc_DATA = README
dist_bin_SCRIPTS = scripts/client-contractor.sh
dist_bin_SCRIPTS += scripts/client-server.sh
dist_bin_SCRIPTS += scripts/client-worker.sh
+dist_bin_SCRIPTS += scripts/common.sh
+dist_bin_SCRIPTS += scripts/run_all.sh
Modified: experiments/stephen-receptionist/client/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/client/Makefile.am (original)
+++ experiments/stephen-receptionist/client/Makefile.am Thu Jul 22 17:30:35 2010
@@ -5,9 +5,12 @@
AM_LDFLAGS = -lboost_program_options -lboost_system -lboost_thread
client_SOURCES = client.cc
-client_SOURCES += client_command.cc
-client_SOURCES += client_communicator.cc
-client_SOURCES += client_controller.cc
-client_SOURCES += logger.cc
+client_SOURCES += client_command.cc client_command.h
+client_SOURCES += client_communicator.cc client_communicator.h
+client_SOURCES += client_controller.cc client_controller.h
+client_SOURCES += client_controller_synchronous.cc client_controller_synchronous.h
+client_SOURCES += client_controller_asynchronous.cc client_controller_asynchronous.h
+client_SOURCES += logger.cc logger.h
+client_SOURCES += token_bucket.cc token_bucket.h
client_LDADD = $(top_srcdir)/common/libcommon.a
Modified: experiments/stephen-receptionist/client/client.cc
==============================================================================
--- experiments/stephen-receptionist/client/client.cc (original)
+++ experiments/stephen-receptionist/client/client.cc Thu Jul 22 17:30:35 2010
@@ -52,9 +52,10 @@
#include "client_command.h"
#include "client_communicator.h"
+#include "client_controller_asynchronous.h"
+#include "client_controller_synchronous.h"
#include "exception.h"
#include "logger.h"
-#include "client_controller.h"
int main(int argc, char**argv)
@@ -74,13 +75,19 @@
// Initialize the logging module
Logger logger(command.getLogfile(),
- command.getCount(), command.getPktsize(),
- command.getBurst());
+ command.getCount(), command.getSize(),
+ command.getBurst(), command.getMargin());
// Run the tests
- ClientController controller(command.getCount(), command.getBurst(),
- command.getPktsize());
- controller.run(communicator, logger);
+ if (command.isAsynchronous()) {
+ ClientControllerAsynchronous controller(command.getCount(),
+ command.getBurst(), command.getSize(), command.getMargin());
+ controller.run(communicator, logger);
+ } else {
+ ClientControllerSynchronous controller(command.getCount(),
+ command.getBurst(), command.getSize());
+ controller.run(communicator, logger);
+ }
// Print the results
logger.log();
Modified: experiments/stephen-receptionist/client/client_command.cc
==============================================================================
--- experiments/stephen-receptionist/client/client_command.cc (original)
+++ experiments/stephen-receptionist/client/client_command.cc Thu Jul 22 17:30:35 2010
@@ -22,6 +22,7 @@
#include "client_command.h"
#include "defaults.h"
+#include "debug.h"
namespace po = boost::program_options;
@@ -34,8 +35,8 @@
// parsed, hence the initialization to zero values here.
ClientCommand::ClientCommand(int argc, char** argv) :
- address_(""), count_(0), logfile_(""), pktsize_(0), port_(0),
- desc_("Usage: client [options...]"), vm_(), burst_(0)
+ address_(""), count_(0), logfile_(""), size_(0), port_(0),
+ desc_("Usage: client [options...]"), vm_(), burst_(0), margin_(0)
{
// Parse the command line
@@ -62,31 +63,47 @@
// Set up the command-line options
+ uint32_t dbglevel; // Debug level
+
desc_.add_options()
("help", "produce help message")
("address,a",
po::value<std::string>(&address_)->default_value(CL_DEF_ADDRESS),
"IP address (V4 or V6) of the server system")
+ ("asynchronous,y",
+ "Specify for asynchronous tests; the default is synchronous")
("burst,b",
- po::value<uint32_t>(&burst_)->default_value(CL_DEF_BURST),
- "Burst size used on the server (this is logged, not used)")
+ po::value<long>(&burst_)->default_value(CL_DEF_BURST),
+ "Burst size: for asynchronous runs, this is logged but not used")
("count,c",
- po::value<uint32_t>(&count_)->default_value(CL_DEF_COUNT),
+ po::value<long>(&count_)->default_value(CL_DEF_COUNT),
"Count of packets to send")
+ ("debug,d",
+ po::value<uint32_t>(&dbglevel)->default_value(CL_DEF_DEBUG),
+ "Debug level")
("logfile,l",
po::value<std::string>(&logfile_)->default_value(CL_DEF_LOGFILE),
"File to which statistics are logged")
- ("pktsize,s",
- po::value<uint16_t>(&pktsize_)->default_value(CL_DEF_PKTSIZE),
+ ("margin,m",
+ po::value<long>(&margin_)->default_value(CL_DEF_LOST),
+ "Maximum allowed lost packets in asynchronous tests")
+ ("size,k",
+ po::value<uint16_t>(&size_)->default_value(CL_DEF_PKTSIZE),
"Size of each packet (max = 65,536)")
("port,p",
po::value<uint16_t>(&port_)->default_value(CL_DEF_PORT),
- "Port on server to which packets are sent");
+ "Port on server to which packets are sent")
+ ("synchronous,s",
+ "Specify for synchronous tests; the default is asynchronous");
// Parse
po::store(po::command_line_parser(argc, argv).options(desc_).run(), vm_);
po::notify(vm_);
+ // Set the debug level.
+
+ Debug::setLevel(dbglevel);
+
return;
}
Modified: experiments/stephen-receptionist/client/client_command.h
==============================================================================
--- experiments/stephen-receptionist/client/client_command.h (original)
+++ experiments/stephen-receptionist/client/client_command.h Thu Jul 22 17:30:35 2010
@@ -25,7 +25,6 @@
/// boost::program_options library.
#include <stdint.h>
-
#include <string>
#include <iostream>
@@ -69,21 +68,30 @@
bool getHelp() const {
return vm_.count("help");
}
-
/// \return The name of the log file.
std::string getLogfile() const {
return logfile_;
}
+ /// \return Number of packets an asynchronous test can have outstanding
+ uint32_t getMargin() const {
+ return margin_;
+ }
+
/// \return The size of the packets to send.
- uint16_t getPktsize() const {
- return pktsize_;
+ uint16_t getSize() const {
+ return size_;
}
/// \return The port on the server to which to connect.
uint16_t getPort() const {
return port_;
+ }
+
+ /// Return whether the test is synchronour or asynchronous
+ bool isAsynchronous() const {
+ return vm_.count("asynchronous");
}
private:
@@ -93,17 +101,18 @@
///
/// \param argc Size of the command-line argument array.
/// \param argv Command-line argument array.
- /// \param vm BOOST program options variable map. On exit this will hold
- /// information from the parsed command line.
void parseCommandLine(int argc, char** argv);
private:
std::string address_; //< IP address of the server
- uint32_t burst_; //< Burst size (for record only)
- uint32_t count_; //< Total number of packets to send
+ bool asynchronous_; //< true if asynchronous mode
+ long burst_; //< Burst size.
+ long count_; //< Total number of packets to send
std::string logfile_; //< Name of the log file
- uint16_t pktsize_; //< Maximum size of each packet
+ long margin_; //< Max allowed lost packets (async only)
+ uint16_t size_; //< Maximum size of each packet
uint16_t port_; //< Port number on server to which to connect
+
boost::program_options::options_description desc_; //< Options description structure
boost::program_options::variables_map vm_; //< Maps variables to values
};
Modified: experiments/stephen-receptionist/client/client_communicator.h
==============================================================================
--- experiments/stephen-receptionist/client/client_communicator.h (original)
+++ experiments/stephen-receptionist/client/client_communicator.h Thu Jul 22 17:30:35 2010
@@ -14,14 +14,12 @@
// $Id$
-#ifndef __COMMUNICATOR_H
-#define __COMMUNICATOR_H
+#ifndef __CLIENT_COMMUNICATOR_H
+#define __CLIENT_COMMUNICATOR_H
#include <string>
#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread.hpp>
#include <boost/utility.hpp>
/// \brief Communcates with the server
@@ -90,4 +88,4 @@
boost::asio::ip::udp::endpoint endpoint_; // Target endpoint for comms
};
-#endif // __COMMUNICATOR_H
+#endif // __CLIENT_COMMUNICATOR_H
Modified: experiments/stephen-receptionist/client/client_controller.cc
==============================================================================
--- experiments/stephen-receptionist/client/client_controller.cc (original)
+++ experiments/stephen-receptionist/client/client_controller.cc Thu Jul 22 17:30:35 2010
@@ -28,127 +28,45 @@
#include "client_communicator.h"
#include "client_controller.h"
+#include "debug.h"
#include "defaults.h"
#include "logger.h"
+#include "packet_counter.h"
#include "utilities.h"
+
// Runs the tests and records the results in the logger.
void
ClientController::run(ClientCommunicator& communicator, Logger& logger) {
- // Set up for the test. The packet size is rounded up to a four-byte
- // boundary, and not allowed to be more than the 64-bytes less than the
- // maximum UDP packet size. (A bit arbitrary, but allows for overhead.)
- size_t packet_size = ((pktsize_ + 3) / 4) * 4;
- packet_size = std::min(packet_size, (UDP_BUFFER_SIZE - 64));
- pktsize_ = boost::numeric_cast<uint16_t>(packet_size);
- logger.setPktsize(pktsize_);
-
+ logger.setSize(size_);
setUp();
- // Start the timer.
- logger.start();
-
// Actually execute the test
- runTest(communicator);
-
- // Stop the timer and write it out.
- logger.end();
+ runTest(communicator, logger);
return;
}
-
// Sets up the data for the test.
void
ClientController::setUp() {
- // Allocate the buffers.
+ // Initialize the data in the packet being sent.
+ snd_buffer_.init(size_);
- snd_buffer_ = boost::shared_array<uint8_t>(new uint8_t[UDP_BUFFER_SIZE]);
- rcv_buffer_ = boost::shared_array<uint8_t>(new uint8_t[UDP_BUFFER_SIZE]);
+ // Now calculate the CRC check and put onto the end of the send buffer.
+ uint32_t crc = Utilities::Crc(snd_buffer_.data(), snd_buffer_.dataSize());
+ snd_buffer_.setCrc(crc);
- // Fill the send buffer with data and calculate the checksum.
+ // Duplicate the template send buffer for use as the receive buffer.
+ // The data areas will be separate.
+ rcv_buffer_ = snd_buffer_.clone();
- for (unsigned int i = 0; i < pktsize_; ++i) {
- snd_buffer_[i] = boost::numeric_cast<uint8_t>(i % 256);
- }
-
- // Now calculate the CRC check and put onto the end of the buffer.
- // We'll send pktsize_ bytes of data, but compare pktsize_+4. Note
- // that the check in the constructor ensures that we have this space.
- Utilities::Crc(snd_buffer_.get(), pktsize_);
+ // Initialize for counting packets (this initializes static counters).
+ PacketCounter counter;
return;
}
-
-
-
-// Sender task. Sends up to "npacket" packets as fast as it can.
-void
-ClientController::sendTask(ClientCommunicator& communicator, int npacket) {
-
- // Send the packets as fast as we can. The send() method will block when
- // the outstanding packet limit is reached.
-
- for (int i = 0; i < npacket; ++i) {
- communicator.send(snd_buffer_.get(), pktsize_);
- }
-
- return;
-}
-
-
-
-// Receiver task. Receives "npacket" packets as fast as it can and compares
-// the received packets with the sent ones (including CRC). Prints an
-// error if there is a mismatch.
-void
-ClientController::receiveTask(ClientCommunicator& communicator, int npacket) {
-
- for (int i = 0; i < npacket; ++i) {
-
- // Zero the receive buffer before issuing the receive to counter
- // the case where the receive fails but the buffer is left
- // unchanged.
- memset(rcv_buffer_.get(), 0, UDP_BUFFER_SIZE);
- size_t received = communicator.receive(rcv_buffer_.get(),
- UDP_BUFFER_SIZE);
-
- // The received packet should be the same as the sent packet but
- // with the addition of a CRC.
- if (received != (pktsize_ + 4)) {
- std::cout << "Received packet wrong size: expected " << (pktsize_ + 4)
- << ", received " << received << std::endl;
- } else if (memcmp(snd_buffer_.get(), rcv_buffer_.get(), received) != 0) {
- std::cout << "Received packet differs from sent packet: ";
- if (memcmp(snd_buffer_.get() + pktsize_, rcv_buffer_.get() + pktsize_, 4) != 0) {
- std::cout << "checksum is different" << std::endl;
- } else {
- std::cout << "contents are different" << std::endl;
- }
- }
- }
-}
-
-
-// Runs the test. Sends all the packets in groups of "burst" packets. If
-// the total number of packets is not a multiple of "burst", the number of
-// packets is rounded up to the next multiple.
-void
-ClientController::runTest(ClientCommunicator& communicator) {
-
- int packets = count_;
- while (packets > 0) {
-
- // Send and receive a burst of packets
-
- sendTask(communicator, burst_);
- receiveTask(communicator, burst_);
- packets -= burst_;
- }
-
- return;
-}
Modified: experiments/stephen-receptionist/client/client_controller.h
==============================================================================
--- experiments/stephen-receptionist/client/client_controller.h (original)
+++ experiments/stephen-receptionist/client/client_controller.h Thu Jul 22 17:30:35 2010
@@ -21,17 +21,14 @@
#include "client_communicator.h"
#include "logger.h"
+#include "udp_buffer.h"
/// \brief Runs the Tests
///
-/// The ClientController class in the client is the module that actually carries out
-/// the tests. It creates a buffer of the specified size and fills it with
-/// random data. It then enters a loop, sending and receiving data. On
-/// every packet received, the following checks are done:
-/// - Data is 4 bytes longer than sent
-/// - Data received is same as data sent (except those four bytes)
-/// - Last 4-byte word is the correct checksum
+/// The ClientController is the base for the synchronous and asynchronous
+/// controller classes that run the tests. It merely handles the storing of
+/// parameters and the setup of data.
class ClientController : private boost::noncopyable {
public:
@@ -41,55 +38,45 @@
/// Stores the parameters of the test.
/// \param count Number of packets to send in the test.
/// \patam burst Burst size of the communication
- /// \param pktsize Size of each packet.
- ClientController(uint32_t count, uint32_t burst, uint16_t pktsize) :
- count_(count), burst_(burst), pktsize_(pktsize)
+ /// \param size Size of each packet.
+ /// \param margin Margin to allow for asynchronous loss
+ ClientController(long count, long burst, uint16_t size, long margin) :
+ count_(count), burst_(burst), size_(size), margin_(margin),
+ snd_buffer_(), rcv_buffer_()
{}
+ virtual ~ClientController() {}
- /// \brief Runs the test
+ /// \brief Runs the Test
///
- /// Runs the test by constructing the data arrays, then looping, sending
- /// and receiving data. Data is sent in bursts (the size of each burst
- /// being specified in the constructor). After each burst, the client waits
- /// for the packets to be returned before sending the next set of packets.
- /// At the end of the test, information is appended to the logfile.
+ /// Handles the initialization of the data and the logging. The clock is
+ /// started, runTest() called and, when it returns, the clock is stopped.
+ /// \param communicator Communicator to be used for the test.
+ /// \param logger Logger used to start and stop the clock
+ virtual void run(ClientCommunicator& communicator, Logger& logger);
+
+ /// \brief Runs the Class-Specific test
+ ///
+ /// Pure virtual method that must be overridden by each subclass controller.
/// \param communicator Communicator to be used for the test.
/// \param logger Logger object to be used for the test.
- void run(ClientCommunicator& communicator, Logger& logger);
+ virtual void runTest(ClientCommunicator& communicator, Logger& logger) = 0;
- /// \brief Sending task
- ///
- /// This sends the given number of packets to the server.
- /// \param communicator Interface to the I/O system
- /// \param npacket Number of packets to send
- void sendTask(ClientCommunicator& communicator, int npacket);
-
- /// \brief Receiving task
- ///
- /// Receives the specified number of packets from the server and compares
- /// the contents with the packets sent.
- /// \param communicator Interface to the I/O system
- /// \param Number of packets expected
- void receiveTask(ClientCommunicator& communicator, int npacket);
-
-
-private:
/// \brief Sets up for the test
///
/// Allocates the buffers and initialises the data.
- void setUp();
+ virtual void setUp();
- /// \brief Runs the test
- ///
- /// Actually performs the testing and logging of the result.
- void runTest(ClientCommunicator& communicator);
+protected:
+ uint16_t size_; //< Size of each packet (max 64K)
-private:
- uint16_t pktsize_; //< Size of each packet
- uint32_t burst_; //< Burst size of the test
- uint32_t count_; //< Total number of packets to send
- boost::shared_array<uint8_t> snd_buffer_; //< Data being sent
- boost::shared_array<uint8_t> rcv_buffer_; //< Data being received
+ // The following three elements are "long" (rather than unsigned long)
+ // as they may be used in calculations involving a signed number.
+
+ long burst_; //< Burst size of the test
+ long count_; //< Total number of packets to send
+ long margin_; //< Margin packets (asynch mode only)
+ UdpBuffer snd_buffer_; //< Data being sent
+ UdpBuffer rcv_buffer_; //< Data being received
};
#endif // __CLIENT_CONTROLLER_H
Modified: experiments/stephen-receptionist/client/logger.cc
==============================================================================
--- experiments/stephen-receptionist/client/logger.cc (original)
+++ experiments/stephen-receptionist/client/logger.cc Thu Jul 22 17:30:35 2010
@@ -52,30 +52,27 @@
void
Logger::logInfo(std::ostream& output)
{
- output <<
- boost::posix_time::to_iso_extended_string(tstart_) << "," <<
- boost::posix_time::to_iso_extended_string(tend_) << "," <<
- count_ << "," << pktsize_ << "," << burst_;
+ output << boost::posix_time::to_iso_extended_string(tstart_)
+ << "," << boost::posix_time::to_iso_extended_string(tend_)
+ << "," << count_ << "," << size_
+ << "," << burst_ << "," << margin_ << "," << lost_;
// Calculate the difference in time and print interval and time per
- // packet.
+ // packet. (Note that as used, this is the time to receive count_ packets;
+ // if "lost" is greater than zero, more that count_ packets will have been
+ // sent to get to this figure.)
boost::posix_time::time_duration interval = tend_ - tstart_;
double dinterval = interval.total_seconds() +
(interval.total_microseconds() / 1.0e6);
- output << "," << dinterval << "," << (dinterval / count_);
-
- // See if there is a comment and if so, log that
- if (comment_.length() != 0) {
-
- // Replace embedded quotes with double quotes
- std::string modified_comment =
- boost::algorithm::replace_all_copy(comment_, "\"", "\"\"");
-
- // ... and output surrounded by quotes
- output << ",\"" << modified_comment << "\"";
- }
- output << "\n";
+ output << "," << dinterval << "," << (dinterval / count_) << "\n";
return;
}
+
+// Stream output for the logger
+
+std::ostream& operator<<(std::ostream& output, Logger& logger) {
+ logger.logInfo(output);
+ return output;
+}
Modified: experiments/stephen-receptionist/client/logger.h
==============================================================================
--- experiments/stephen-receptionist/client/logger.h (original)
+++ experiments/stephen-receptionist/client/logger.h Thu Jul 22 17:30:35 2010
@@ -31,7 +31,7 @@
///
/// The format of this file is:
///
-/// Start, End, Size, Count, Burst, Elapsed, Average [, "Comment"]
+/// Start, End, Size, Count, Burst, Margin, Lost, Elapsed, Average
///
/// Where
/// - Start is the date/time at which the first packet was sent, in the format
@@ -39,19 +39,32 @@
/// - End is the date/time the last packet was received, in the same format.
/// - Size is the size of each data packet sent in bytes.
/// - Count is the total number of packets sent.
+///
+/// Then come three fields that indicate how the test was conducted.
/// - Burst is the number of packets the server must receive before
/// processing them (i.e. the server processes the packets in blocks of
-/// "burst").
+/// "burst"). In the synchronous test, the client sends "burst" packets,
+/// then waits to receive the same number before sending the next set. The
+/// use of this parameter in asynchronous tests is described in "Margin"
+/// below.
+/// - Margin is ignored in synchronous tests (and is output as zero). In
+/// asynchronous tests, up to "burst" packets are sent to the server before
+/// the client *must* stop sending and await a packet in return. This causes
+/// problems if a packet is lost, e.g. if "burst" is set to 4, and the fourth
+/// packet doesn't reach the server, the server will not do anything until
+/// it receives another packet and the client will not send any more packets
+/// until it receives one from the server. "margin" is the number of packets
+/// to account for this - the client may have up to "burst" + "margin"
+/// packets outstanding before it must stop and await a reply from the server.
+/// - Lost is the number of lost packets reported by the client. Each packet
+/// is numbered and sent in sequence; the client receive thread detects
+/// missing sequence numbers and counts the number of lost packets.
+///
///
/// The next two fields are derived fields:
/// - Elapsed is the difference between start and end times (in seconds).
/// - Average is the average elapsed time per packet, given by the total
/// elapsed time divided by the number of packets.
-///
-/// Finally there is an optional field:
-/// - Comment is a comment associated with the entry. The comment is written
-/// surrounted by double-quate characters, with any embedded double-quote
-/// characters doubled. (e.g. the string abc"def is written as "abc""def".)
class Logger {
public:
@@ -62,13 +75,13 @@
/// \param logfile Name of the file to which the log is written. If null,
/// the output is written to stdout.
/// \param count Number of packets sent in the test (default = 0).
- /// \param pktsize Size of each packet (default = 0).
+ /// \param size Size of each packet (default = 0).
/// \param burst Burst size of the packet (default = 0)
/// \param comment Optional comment (default = no comment).
- Logger(const std::string& logfile = "", int count = 0, int pktsize = 0,
- int burst = 0, const std::string& comment = "") :
- logfile_(logfile), count_(count), pktsize_(pktsize),
- burst_(burst), comment_(comment), tstart_(), tend_()
+ Logger(const std::string& logfile = "", int count = 0, int size = 0,
+ int burst = 0, int margin = 0) :
+ logfile_(logfile), count_(count), size_(size), burst_(burst),
+ margin_(margin), lost_(0), tstart_(), tend_()
{}
// Access methods.
@@ -100,14 +113,14 @@
/// \brief Sets the packet size
///
- /// \param pktsize Size of packets to be sent.
- void setPktsize(int pktsize) {
- pktsize_ = pktsize;
+ /// \param size Size of packets to be sent.
+ void setSize(int size) {
+ size_ = size;
}
/// \return Current record of packet size
- int getPktsize() const {
- return (pktsize_);
+ int getSize() const {
+ return (size_);
}
/// \brief Sets the burst size
@@ -123,20 +136,30 @@
return burst_;
}
- /// \brief Set comment associated with the test
+ /// \brief Sets the lost packet margin
///
- /// \param comment Comment to be associated with the text. Any embedded
- /// quotes are doubled, and the comment will be written with surrounding
- /// quotes. Set to the empty string ("") to disable commenting.
- void setComment(const std::string& comment) {
- comment_ = comment;
+ /// \param margin Number of packets in addition to "burst" that can be
+ /// outstanding in asynchronous tests.
+ void setMargin(int margin) {
+ margin_ = margin;
}
- /// \return Current comment to be associated with the test
- std::string getComment() const {
- return (comment_);
+ /// \return Current record of margin
+ int getMargin() const {
+ return margin_;
}
+ /// \brief Sets the count of lost packets
+ ///
+ /// \param lost Number of packets lost in the test
+ void setLost(int lost) {
+ lost_ = lost;
+ }
+
+ /// \return Current vound of lost packets
+ int getLost() const {
+ return lost_;
+ }
/// \brief Logs the start time.
void start() {
tstart_ = boost::posix_time::microsec_clock::universal_time();
@@ -150,8 +173,6 @@
/// \brief Writes the log entry
void log();
-private:
-
/// \brief formats and writes the log entry
///
/// log() opens the log file and calls this method to format and write
@@ -162,11 +183,14 @@
private:
std::string logfile_; //< Current log file
int count_; //< Count of packets in this test
- int pktsize_; //< Packet size
+ int size_; //< Packet size
int burst_; //< Burst size
- std::string comment_; //< Optional comment
+ int margin_; //< Margin for lost packets
+ int lost_; //< Count of lost packets
boost::posix_time::ptime tstart_; //< Start time of test
boost::posix_time::ptime tend_; //< End time of test
};
+std::ostream& operator<<(std::ostream& output, Logger& logger);
+
#endif // __LOGGER_H
Modified: experiments/stephen-receptionist/common/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/common/Makefile.am (original)
+++ experiments/stephen-receptionist/common/Makefile.am Thu Jul 22 17:30:35 2010
@@ -1,9 +1,13 @@
lib_LIBRARIES = libcommon.a
-libcommon_a_SOURCES = msgq_communicator.cc
-libcommon_a_SOURCES += target_command.cc
-libcommon_a_SOURCES += udp_communicator.cc
-libcommon_a_SOURCES += utilities.cc
+libcommon_a_SOURCES = debug.cc debug.h debug_flags.h
+libcommon_a_SOURCES += burst_server_controller.cc burst_server_controller.h
+libcommon_a_SOURCES += msgq_communicator.cc msgq_communicator.h
+libcommon_a_SOURCES += packet_counter.cc packet_counter.h
+libcommon_a_SOURCES += target_command.cc target_command.h
+libcommon_a_SOURCES += udp_buffer.cc udp_buffer.h
+libcommon_a_SOURCES += udp_communicator.cc udp_communiator.h
+libcommon_a_SOURCES += utilities.cc utilities.h
AM_LDFLAGS = -lboost_system
Modified: experiments/stephen-receptionist/common/controller.h
==============================================================================
--- experiments/stephen-receptionist/common/controller.h (original)
+++ experiments/stephen-receptionist/common/controller.h Thu Jul 22 17:30:35 2010
@@ -21,8 +21,7 @@
/// \brief Basic Controller Class
///
-/// This class is the base for all the controller classes, and controls the
-/// action of the processes that send and receive packets synchronously.
+/// This class is the base for all the controller classes.
class Controller {
public:
Modified: experiments/stephen-receptionist/common/defaults.h
==============================================================================
--- experiments/stephen-receptionist/common/defaults.h (original)
+++ experiments/stephen-receptionist/common/defaults.h Thu Jul 22 17:30:35 2010
@@ -30,7 +30,9 @@
static const std::string CL_DEF_ADDRESS = "127.0.0.1";
static const uint32_t CL_DEF_BURST = 1;
static const uint32_t CL_DEF_COUNT = 256;
+static const uint32_t CL_DEF_DEBUG = 0;
static const std::string CL_DEF_LOGFILE = "";
+static const uint32_t CL_DEF_LOST = 4;
static const uint32_t CL_DEF_OUTSTANDING = 8;
static const uint32_t CL_DEF_PERCENT = 100;
static const uint16_t CL_DEF_PKTSIZE = 8192;
@@ -43,7 +45,7 @@
/// \brief Miscellaneous
-static const size_t UDP_BUFFER_SIZE = 65536;
+static const size_t UDP_BUFFER_SIZE = 65536 - 64; // Allows for UDP overhead
/// \brief Message Queue Names
Modified: experiments/stephen-receptionist/common/exception.h
==============================================================================
--- experiments/stephen-receptionist/common/exception.h (original)
+++ experiments/stephen-receptionist/common/exception.h Thu Jul 22 17:30:35 2010
@@ -32,7 +32,7 @@
public:
/// \brief Constructs an exception with no reason.
- Exception() : reason_("") {}
+ Exception(const char* why = "") : reason_(why) {}
/// \brief Constructs an exception giving a reason.
/// \param why Reason for the exception.
@@ -70,8 +70,20 @@
class Timeout : public Exception {
public:
- Timeout() : Exception("Timeout has expired") {}
- Timeout(const char* why) : Exception(why) {}
+ Timeout(const char* why = "Timeout has expired") : Exception(why)
+ {}
+};
+
+/// \brief Exception thrown if a data packet is too large
+///
+/// Thrown when a data packet is constructed if the size requested is above
+/// the maximum.
+
+class PacketTooLarge : public Exception {
+public:
+ PacketTooLarge(const char* why = "Requested packet size is too large") :
+ Exception(why)
+ {}
};
#endif // __EXCEPTION_H
Modified: experiments/stephen-receptionist/common/msgq_communicator.cc
==============================================================================
--- experiments/stephen-receptionist/common/msgq_communicator.cc (original)
+++ experiments/stephen-receptionist/common/msgq_communicator.cc Thu Jul 22 17:30:35 2010
@@ -26,20 +26,28 @@
#include "msgq_communicator.h"
#include "defaults.h"
#include "exception.h"
-//#include "types.h"
#include "udp_buffer.h"
// Opens the connection to the network, in this case associating with the
-// message queues.
+// message queues. The queue is deleted before it is created; if it already
+// exists, it may have some messages in it that we are not interested in.
+//
+// Note that as two programs (p1 and p2) use the message queue, both must
+// be stopped and restarted for the delections to be effective; if both are
+// running, the sequence stop p1, start p1, stop p2, start p2 is not effective -
+// the message queue is never closed to the deletion will fail.
void
MsgqCommunicator::open() {
+// (void) boost::interprocess::message_queue::remove(rcv_name_.c_str());
rcv_queue_ = mq_ptr(
new boost::interprocess::message_queue(
boost::interprocess::open_or_create,
rcv_name_.c_str(), QUEUE_MAX_MESSAGE_COUNT, QUEUE_MAX_MESSAGE_SIZE)
);
+
+// (void) boost::interprocess::message_queue::remove(snd_name_.c_str());
snd_queue_ = mq_ptr(
new boost::interprocess::message_queue(
boost::interprocess::open_or_create,
@@ -55,8 +63,8 @@
MsgqCommunicator::send(UdpBuffer& buffer) {
snd_queue_->send(
- buffer.data.get(), // Data element
- buffer.size, // Amount of data transferred
+ buffer.data(), // Data element
+ buffer.size(), // Amount of data transferred
QUEUE_PRIORITY); // Message priority
return;
@@ -67,16 +75,15 @@
UdpBuffer
MsgqCommunicator::receive() {
- boost::shared_array<uint8_t> data = // Buffer to receive data
- boost::shared_array<uint8_t>(new uint8_t[UDP_BUFFER_SIZE]);
- size_t received_size; // Amount of data received
- unsigned int priority; // Priority of receiv3ed data
+ UdpBuffer buffer; // Buffer to receive data
+ size_t received_size; // Amount of data received
+ unsigned int priority; // Priority of received data
- rcv_queue_->receive(data.get(), UDP_BUFFER_SIZE, received_size,
+ rcv_queue_->receive(buffer.data(), buffer.capacity(), received_size,
priority);
+ buffer.setSize(received_size);
- // return the buffer, creating a dummy endpoint in the process.
- return (UdpBuffer(boost::asio::ip::udp::endpoint(), received_size, UDP_BUFFER_SIZE, data));
+ return (buffer);
}
// Closes down the message queues by deleting them. This may not work if
Modified: experiments/stephen-receptionist/common/target_command.cc
==============================================================================
--- experiments/stephen-receptionist/common/target_command.cc (original)
+++ experiments/stephen-receptionist/common/target_command.cc Thu Jul 22 17:30:35 2010
@@ -23,6 +23,7 @@
#include "target_command.h"
#include "defaults.h"
+#include "debug.h"
namespace po = boost::program_options;
@@ -55,10 +56,12 @@
// Parse the command line and return a variable map with all the options set.
-// The method does not validate the string arguments (--address and --logfile)
+// The module automatically sets the debug level - there is no need for the
+// caller to worry about that.
void
TargetCommand::parseCommandLine(int argc, char** argv) {
+ uint32_t debug; // Debug level
// Set up the command-line options
@@ -67,6 +70,9 @@
("burst,b",
po::value<uint32_t>(&burst_)->default_value(CL_DEF_BURST),
"Burst size: number of packets processed at one time")
+ ("debug,d",
+ po::value<uint32_t>(&debug)->default_value(CL_DEF_DEBUG),
+ "Debug level: a value of 0 disables debug messages")
("port,p",
po::value<uint16_t>(&port_)->default_value(CL_DEF_PORT),
"Port on which to listen");
@@ -76,5 +82,9 @@
po::store(po::command_line_parser(argc, argv).options(desc_).run(), vm_);
po::notify(vm_);
+ // ... and handle options that we can cope with internally.
+
+ Debug::setLevel(debug);
+
return;
}
Modified: experiments/stephen-receptionist/common/udp_buffer.h
==============================================================================
--- experiments/stephen-receptionist/common/udp_buffer.h (original)
+++ experiments/stephen-receptionist/common/udp_buffer.h Thu Jul 22 17:30:35 2010
@@ -14,26 +14,261 @@
// $Id$
-#ifndef __UDP_BUFFER_H
-#define __UDP_BUFFER_H
-
+#ifndef __DATA_PACKET_H
+#define __DATA_PACKET_H
+
+#include <stdint.h>
+#include <arpa/inet.h>
#include <boost/asio.hpp>
#include <boost/shared_array.hpp>
-/// \brief UDP Buffer Class
-///
-/// Holds information about data received from/sent to a UDP socket.
-struct UdpBuffer {
+#include <algorithm>
+
+#include "defaults.h"
+#include "exception.h"
+
+
+/// \brief Udp Buffer
+///
+/// Encapsulates the data sent between the various processes in the receptionist
+/// tests.
+///
+/// The packet layout is fixed and comprises
+///
+/// - (size bytes - 14) (where size is the requested size): the data area
+/// - 4 bytes: CRC of the data area (in network byte order)
+/// - 4 bytes: Packet number (in network byte order)
+/// - 4 bytes: IPV4 address (in network byte order)
+/// - 2 bytes: IPV4 port (in network byte order)
+///
+/// The interpretation of the data - which fields are relevant etc. - is up
+/// to the program.
+///
+/// When constructed, the size of the packet that may be specified is no
+/// more than UDP_BUFFER_SIZE - 14;
+
+class UdpBuffer {
public:
- boost::asio::ip::udp::endpoint endpoint; //< Socket end point
- size_t size; //< Amount of data in buffer
- size_t capacity; //< Maximum size of buffer
- boost::shared_array<uint8_t> data; //< Data in the buffer
-
- UdpBuffer(boost::asio::ip::udp::endpoint ep, size_t amount,
- size_t max_amount, boost::shared_array<uint8_t> contents) :
- endpoint(ep), size(amount), capacity(max_amount), data(contents)
- {}
+
+ /// \brief Creates the packet
+ ///
+ /// All packets are created with a capacity equal to UDP_BUFFER_SIZE.
+ /// The amount of data can be set via setSize() or implicity by init().
+ UdpBuffer() : size_(0), capacity_(UDP_BUFFER_SIZE)
+ {
+ if (capacity_ > UDP_BUFFER_SIZE) {
+ throw PacketTooLarge();
+ }
+
+ data_ = boost::shared_array<uint8_t>(new uint8_t[UDP_BUFFER_SIZE]);
+ }
+
+ /// The default copy and assignment constructors are OK here although
+ /// "A = B" does mean that A and B will point to the same data. To do
+ /// a deep copy, use clone.
+ UdpBuffer clone() const;
+
+ /// \brief Initialize Packet
+ ///
+ /// Initializes the data part of the packet to a sequence of increasing
+ /// integers.
+ void init(size_t size) {
+
+ // Silently ensure that at least one byte of data will be sent,
+ // but that the buffer can hold the data and overhead.
+ size_ = std::min(std::max(size, static_cast<size_t>(15)),
+ UDP_BUFFER_SIZE);
+ for (int i = 0; i < dataSize(); ++i) {
+ data_.get()[i] = (i % 256);
+ }
+ }
+
+ /// \return Pointer to data area. This is only valid for as long as the
+ /// UdpBuffer object is in existence.
+ uint8_t* data() {
+ return data_.get();
+ }
+
+ /// \return Pointer to data area. This is only valid for as long as the
+ /// UdpBuffer object is in existence.
+ const uint8_t* data() const {
+ return data_.get();
+ }
+
+ /// \return Size of the packet
+ size_t size() const {
+ return (size_);
+ }
+
+ /// \return Size of the data in the packet
+ size_t dataSize() const {
+ return (size() - 14);
+ }
+
+ /// \return Capacity of the packet
+ size_t capacity() const {
+ return (capacity_);
+ }
+
+ /// \brief Clear the buffer
+ ///
+ /// Zeroes the contents of the buffer.
+ void zero() {
+ memset(data_.get(), 0, capacity());
+ }
+
+ /// \brief Sets Size
+ ///
+ /// If the buffer is used to receive data, the amount of data received is
+ /// set here. As the data is assumed to follow the layout here, the
+ /// size() and dataSize() methods will subsequently return correct values.
+ /// \param szie Amount of data received.
+ void setSize(size_t size) {
+ size_ = size;
+ }
+
+ // The following methods get and set data in the packet. No constants
+ // are defined for offsets (though they should be), but they are:
+ //
+ // 0 Data area of the packet
+ // size_ - 14 CRC
+ // size_ - 10 Packet number
+ // size_ - 6 IPV4 address
+ // size_ - 2 Port
+
+ /// \brief Set CRC
+ ///
+ /// \param crc CRC to set. Supplied in host byte order this is stored in
+ /// network byte order.
+ void setCrc(uint32_t crc) {
+ setData(size() - 14, htonl(crc));
+ }
+
+ /// \return CRC of the data packet in host byte order.
+ uint32_t getCrc() const {
+ uint32_t crc;
+ getData(size() - 14, crc);
+ return ntohl(crc);
+ }
+
+ /// \brief Set Packet Number
+ ///
+ /// \param number Number to set the packet number to. This is provided
+ /// in host byte order but stored in network byte order.
+ void setPacketNumber(uint32_t number) {
+ setData(size() - 10, htonl(number));
+ }
+
+ /// \return Packet number in host byte order
+ uint32_t getPacketNumber() const {
+ uint32_t number;
+ getData(size() - 10, number);
+ return ntohl(number);
+ }
+
+ /// \brief Set IPV4 Address
+ ///
+ /// \param address IPV4 address in host byte order (stored in network byte
+ /// order).
+ void setAddress(uint32_t address) {
+ setData(size() - 6, htonl(address));
+ }
+
+ /// \return IPV4 address in host byte order
+ uint32_t getAddress() const {
+ uint32_t address;
+ getData(size() - 6, address);
+ return ntohl(address);
+ }
+
+ /// \brief Set IPV4 Port
+ ///
+ /// \param port Port number in host byte order (stored in network byte
+ /// order).
+ void setPort(uint16_t port) {
+ setData(size() - 2, htons(port));
+ }
+
+ /// \return Port number in host byte order
+ uint16_t getPort() const {
+ uint16_t port;
+ getData(size() - 2, port);
+ return ntohs(port);
+ }
+
+ // Endpoint Access
+
+ /// \brief Set Endpoint Information
+ ///
+ /// Given a UDP endpoint, add the information to the data packet.
+ /// \param ep Endpoint holding requeted information
+ void setAddressInfo(const boost::asio::ip::udp::endpoint& ep);
+
+ /// \return Endpoint of Data Packet
+ boost::asio::ip::udp::endpoint getAddressInfo() const;
+
+
+ // Comparison functions
+
+ /// \return If the data in two packets is the same
+ bool compareData(const UdpBuffer& other) const {
+ return (
+ (size() == other.size()) &&
+ (memcmp(static_cast<const void*>(data_.get()),
+ static_cast<const void*>(other.data_.get()),
+ dataSize()) == 0)
+ );
+ }
+
+ /// \return If the CRCs are the same
+ bool compareCrc(const UdpBuffer& other) {
+ return (getCrc() == other.getCrc());
+ }
+
+
+
+private:
+
+ /// \brief Get data from packet as a fundamental data type
+ ///
+ /// \param T template data type required
+ /// \param offset Offset into the data packet for the data
+ /// \param result Data requested. Supplied as a parameter for template
+ /// signature reasons.
+ template <typename T> void getData(size_t offset, T& result) const {
+ memmove(static_cast<void*>(&result),
+ static_cast<const void*>(&(data_.get()[offset])), sizeof(T));
+ return;
+ }
+
+ /// \brief Set data in packet from fundamental data type
+ ///
+ /// \param T template data type required
+ /// \param offset Offset into the data packet for the data
+ /// \param value Data to be inserted into the packet
+ template <typename T> void setData(size_t offset, const T& value) {
+ memmove(static_cast<void*>(&(data_.get()[offset])),
+ static_cast<const void*>(&value), sizeof(T));
+ return;
+ }
+
+private:
+
+ // Although vector contains a size() method, to prevent reallocation
+ // of memory the size of the vector is initialized on construction and
+ // never changed; we keep tabs on how much is used. Therefore:
+ //
+ // data_.size() is always equal to the amount requested plus 6.
+ // size_ is the amount of data in the packet, and is equal to data_.size()
+ // if the address and port information has been added to the packet, and
+ // data_.size() - 6 if not.
+ //
+
+ boost::asio::ip::udp::endpoint endpoint_; //< Socket end point
+ size_t size_; //< Amount of data in buffer
+ size_t capacity_; //< Maximum size of buffer
+ boost::shared_array<uint8_t> data_; //< Data in the buffer
+
};
-#endif // __UDP_BUFFER_H
+#endif // __DATA_PACKET_H
Modified: experiments/stephen-receptionist/common/udp_communicator.cc
==============================================================================
--- experiments/stephen-receptionist/common/udp_communicator.cc (original)
+++ experiments/stephen-receptionist/common/udp_communicator.cc Thu Jul 22 17:30:35 2010
@@ -15,6 +15,7 @@
// $Id$
#include <iostream>
+#include <iomanip>
#include <sstream>
#include <boost/system/error_code.hpp>
@@ -25,6 +26,8 @@
namespace ip = boost::asio::ip;
+#include "debug.h"
+#include "debug_flags.h"
#include "defaults.h"
#include "exception.h"
#include "udp_buffer.h"
@@ -66,11 +69,18 @@
void
UdpCommunicator::send(UdpBuffer& buffer) {
try {
+ if (Debug::flagSet(DebugFlags::PRINT_IP)) {
+ std::cout << "Sending to IP address "
+ << std::hex << buffer.getAddress()
+ << ", port " << std::hex << buffer.getPort()
+ << std::endl;
+ }
+
// Send the data.
size_t sent = socket_ptr_->send_to(
- boost::asio::buffer(buffer.data.get(), buffer.size),
- buffer.endpoint);
- if (sent != buffer.size) {
+ boost::asio::buffer(buffer.data(), buffer.size()),
+ buffer.getAddressInfo());
+ if (sent != buffer.size()) {
throw Exception("Not all data passed to send() was sent");
}
} catch (boost::system::system_error& e) {
@@ -89,18 +99,29 @@
throw Exception("Attempt to read from unbound socket");
}
- size_t received = 0; // Bytes received
- boost::shared_array<uint8_t> data = // Buffer to receive data
- boost::shared_array<uint8_t>(new uint8_t[UDP_BUFFER_SIZE]);
- ip::udp::endpoint sender_endpoint; // Where the packet came from
+ UdpBuffer buffer; // Buffer to receive data
try {
- received = socket_ptr_->receive_from(
- boost::asio::buffer(data.get(), UDP_BUFFER_SIZE), sender_endpoint);
+ ip::udp::endpoint sender_endpoint; // Where the packet came from
+
+ // Receive the data and encode where the data came from into the
+ // data packet.
+ size_t received = socket_ptr_->receive_from(
+ boost::asio::buffer(buffer.data(), buffer.capacity()),
+ sender_endpoint);
+ buffer.setSize(received);
+ buffer.setAddressInfo(sender_endpoint);
+
+ if (Debug::flagSet(DebugFlags::PRINT_IP)) {
+ std::cout << "Received from IP address "
+ << std::hex << buffer.getAddress()
+ << ", port " << std::hex << buffer.getPort()
+ << std::endl;
+ }
} catch (boost::system::system_error& e) {
throw Exception("Error on socket receive", e.what());
}
- return (UdpBuffer(sender_endpoint, received, UDP_BUFFER_SIZE, data));
+ return (buffer);
}
// Closes down the socket
Modified: experiments/stephen-receptionist/common/utilities.cc
==============================================================================
--- experiments/stephen-receptionist/common/utilities.cc (original)
+++ experiments/stephen-receptionist/common/utilities.cc Thu Jul 22 17:30:35 2010
@@ -14,12 +14,12 @@
// $Id$
+#include <algorithm>
#include <iostream>
#include <arpa/inet.h>
#include <boost/asio.hpp>
#include <boost/crc.hpp>
-#include <boost/foreach.hpp>
#include "exception.h"
#include "utilities.h"
@@ -28,113 +28,18 @@
namespace ip = boost::asio::ip;
// Calculates CRC and appends to end of the buffer
-void
+uint32_t
Utilities::Crc(uint8_t* buffer, size_t size) {
- union {
- uint8_t bytes[4]; // Byte representation of the CRC
- uint32_t integer; // 32-bit word representation of the CRC
- } result;
+ uint32_t crc; // CRC check
// Calculate the CRC
boost::crc_32_type calculator;
calculator.reset();
calculator.process_bytes(buffer, size);
- result.integer = calculator.checksum();
+ crc = calculator.checksum();
- // Ensure result is in network byte order and copy to the buffer.
- result.integer = htonl(result.integer);
-
- for (int i = 0; i < 4; ++i) {
- buffer[size + i] = result.bytes[i];
- }
-
- return;
-}
-
-// Extracts UDP endpoint data and appends to the buffer.
-void
-Utilities::AppendEndpoint(UdpBuffer& buffer) {
-
- union {
- uint8_t bytes[4];
- uint32_t ulong;
- } address;
- union {
- uint8_t bytes[2];
- uint16_t ushort;
- } port;
-
- // Check we have enough space for the information.
- if ((buffer.capacity - buffer.size) < (4 + 2)) {
- throw Exception("Not enough space in UdpBuffer for endpoint information");
- }
-
- // Get the address asssociated with the endpoint and convert to network
- // byte order.
- ip::address ep_address = buffer.endpoint.address();
- if (! ep_address.is_v4()) {
- throw Exception("Endpoint address is not IP V4 address");
- }
- address.ulong = htonl(ep_address.to_v4().to_ulong());
-
- // Do the same for the port.
- port.ushort = htons(buffer.endpoint.port());
-
- // Copy to the end of the buffer.
- for (int i = 0; i < 4; ++i) {
- buffer.data[buffer.size + i] = address.bytes[i];
- }
-
- for (int i = 0; i < 2; ++i) {
- buffer.data[buffer.size + 4 + i] = port.bytes[i];
- }
-
- // Update the buffer size
- buffer.size += 6;
-
- return;
-}
-
-// Extracts UDP endpoint data and puts in the endpoint structure
-void
-Utilities::ExtractEndpoint(UdpBuffer& buffer) {
-
- union {
- uint8_t bytes[4];
- uint32_t ulong;
- } address;
- union {
- uint8_t bytes[2];
- uint16_t ushort;
- } port;
-
- // Check there is enough information
- if (buffer.size < (4 + 2)) {
- throw Exception("Not enough data in UdpBuffer for endpoint information");
- }
-
- // Extract the address information and convert to host byte order.
- for (int i = 0; i < 4; ++i) {
- address.bytes[i] = buffer.data[buffer.size - 6 + i];
- }
- address.ulong = ntohl(address.ulong);
-
- // ... and the port
- for (int i = 0; i < 2; ++i) {
- port.bytes[i] = buffer.data[buffer.size - 2 + i];
- }
- port.ushort = ntohs(port.ushort);
-
- // Update the buffer size to reflect less information.
- buffer.size -= 6;
-
- // Construct the endpoint with the information given and put in the
- // UdpBuffer object.
- buffer.endpoint = ip::udp::endpoint(ip::address_v4(address.ulong),
- port.ushort);
-
- return;
+ return crc;
}
// Prints endpoint data to the specified stream
@@ -149,3 +54,45 @@
<< "\n";
}
+/*
+// Conversion between a larger data type and a byte array
+
+template<typename T> void
+Utilities::Convert(T from, uint8_t* to) {
+ union {
+ T word;
+ uint8_t byte[sizeof(T)];
+ } cvt;
+
+ cvt.word = from;
+ std::copy(cvt.byte, cvt.byte + sizeof(T), to);
+
+ return;
+}
+
+// ... and instantiate for uint16_t and uint32_t.
+
+template void Utilities::Convert(const uint32_t from, uint8_t* to);
+template void Utilities::Convert(const uint16_t from, uint8_t* to);
+
+
+// Conversion between a byte array and a larger type.
+
+template<typename T> void
+Utilities::Convert(const uint8_t* from, T& to) {
+ union {
+ T word;
+ uint8_t byte[sizeof(T)];
+ } cvt;
+
+ std::copy(from, from + sizeof(T), cvt.byte);
+ to = cvt.word;
+
+ return;
+}
+
+// ... and instantiate for uint16_t and uint32_t.
+
+template void Utilities::Convert(const uint8_t* from, uint32_t& to);
+template void Utilities::Convert(const uint8_t* from, uint16_t& to);
+*/
Modified: experiments/stephen-receptionist/common/utilities.h
==============================================================================
--- experiments/stephen-receptionist/common/utilities.h (original)
+++ experiments/stephen-receptionist/common/utilities.h Thu Jul 22 17:30:35 2010
@@ -18,6 +18,7 @@
#define __UTILITIES_H
#include <stdint.h>
+#include <string.h>
#include "udp_buffer.h"
@@ -28,38 +29,13 @@
class Utilities {
public:
- /// \brief Calculates CRC and appends to end of buffer
+ /// \brief Calculates CRC
///
/// Calculates the CRC32 checksum of an array of data and appends the result
/// (in network byte order) to the end of the data.
- /// \param buffer Data buffer. It is assumed to be sized to (size+4)
- /// \param size Amount of data in the buffer. The buffer is assumed to be
- /// four bytes longer to contain the CRC checksum.
- static void Crc(uint8_t* buffer, size_t size);
-
- /// \brief Append Endpoint
- ///
- /// Many of the programs require that both the data and the location to
- /// which the data should be sent to be transferred as a string of bytes.
- ///
- /// This method takes a UdpBuffer elements, extracts the UDP address and
- /// port number, and appends it (in network byte order) to the data.
- /// The address is assumed to be IPV4, so six bytes (address and port) are
- /// appended to the data and the data count adjusted accordingly.
- ///
- /// \exception Exception Thrown if there is not enough space to append the
- /// endpoint.
- static void AppendEndpoint(UdpBuffer& buffer);
-
- /// \brief Extract Endpoint
- ///
- /// Does the reverse of AppendEndpoint. The last six bytes of the buffer
- /// are assumed to contain the IP address and port number of the endpoint
- /// (in network byte order). The data is extracted and put into the
- /// endpoint structure in the buffer.
- ///
- /// \exception Exception Thrown if there is no enough data in the buffer.
- static void ExtractEndpoint(UdpBuffer& buffer);
+ /// \param buffer Data buffer.
+ /// \param size Amount of data in the buffer.
+ static uint32_t Crc(uint8_t* buffer, size_t size);
/// \brief Prints endpoint information
///
@@ -69,6 +45,24 @@
/// \param endpoint Endpoint for which information is required.
static void PrintEndpoint(std::ostream& output,
boost::asio::ip::udp::endpoint endpoint);
+
+ /// \brief Converts from byte array
+ ///
+ /// Copies a sequence of bytes into a larger data type.
+ /// \param from Byte array to copy
+ /// \param to Larger data type into which to copy array
+ template <typename T> static void CopyBytes(const uint8_t* from, T& to) {
+ memmove(static_cast<void*>(&to), static_cast<const void*>(from), sizeof(T));
+ }
+
+ /// \brief Converts to byte array
+ ///
+ /// Copies the contents of a large data type into a sequence of bytes
+ /// \param from Data to copy,
+ /// \param to Byte array into which to copy the data
+ template <typename T> static void CopyBytes(const T& from, uint8_t* to) {
+ memmove(static_cast<void*>(to), static_cast<const void*>(&from), sizeof(T));
+ }
};
#endif // __UTILITIES_H
Modified: experiments/stephen-receptionist/configure.ac
==============================================================================
--- experiments/stephen-receptionist/configure.ac (original)
+++ experiments/stephen-receptionist/configure.ac Thu Jul 22 17:30:35 2010
@@ -25,6 +25,7 @@
AC_CONFIG_FILES([Makefile
common/Makefile
+ common/test/Makefile
client/Makefile
contractor/Makefile
intermediary/Makefile
Modified: experiments/stephen-receptionist/contractor/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/contractor/Makefile.am (original)
+++ experiments/stephen-receptionist/contractor/Makefile.am Thu Jul 22 17:30:35 2010
@@ -7,4 +7,3 @@
contractor_LDADD = $(top_srcdir)/common/libcommon.a
contractor_SOURCES = contractor.cc
-contractor_SOURCES += contractor_controller.cc
Modified: experiments/stephen-receptionist/contractor/contractor.cc
==============================================================================
--- experiments/stephen-receptionist/contractor/contractor.cc (original)
+++ experiments/stephen-receptionist/contractor/contractor.cc Thu Jul 22 17:30:35 2010
@@ -34,7 +34,7 @@
#include <iostream>
#include <iomanip>
-#include "contractor_controller.h"
+#include "burst_server_controller.h"
#include "defaults.h"
#include "exception.h"
#include "msgq_communicator.h"
@@ -58,7 +58,7 @@
communicator.open();
// Create the task controller.
- ContractorController controller(command.getBurst());
+ BurstServerController controller(command.getBurst());
// ... and enter the run loop.
Modified: experiments/stephen-receptionist/contractor/contractor_controller.cc
==============================================================================
--- experiments/stephen-receptionist/contractor/contractor_controller.cc (original)
+++ experiments/stephen-receptionist/contractor/contractor_controller.cc Thu Jul 22 17:30:35 2010
@@ -21,7 +21,9 @@
#include "communicator.h"
#include "contractor_controller.h"
+#include "debug.h"
#include "defaults.h"
+#include "packet_counter.h"
#include "udp_buffer.h"
#include "utilities.h"
@@ -31,37 +33,27 @@
ContractorController::run(Communicator& downstream_communicator,
Communicator& upstream_communicator)
{
+ PacketCounter counter;
- int num = 0;
- for (;;) { // Forever
+ while (true) { // Forever
// Receive the burst of packets and put them onto the queue
for (int i = 0; i < burst_; ++i) {
+ Debug::log(counter.incrementReceive(), "Calling receive");
UdpBuffer buffer = downstream_communicator.receive();
queue_.push_back(buffer);
}
- // Calculate the checksums.
+ // Calculate the CRCs
std::list<UdpBuffer>::iterator li;
for (li = queue_.begin(); li != queue_.end(); ++li) {
-
- // Calculate CRC if enough capacity
- if ((li->capacity - li->size) >= 4) {
-
- // Remove UDP endpoint information and store in buffer
- Utilities::ExtractEndpoint(*li);
-
- // Calculate CRC
- Utilities::Crc((li->data).get(), li->size);
- li->size += 4;
-
- // Append endpoint enformation again.
- Utilities::AppendEndpoint(*li);
- }
+ uint32_t crc = Utilities::Crc(li->data(), li->dataSize());
+ li->setCrc(crc);
}
// Now return the packets back to the sender.
for (li = queue_.begin(); li != queue_.end(); ++li) {
+ Debug::log(counter.incrementSend(), "Calling send");
downstream_communicator.send(*li);
}
queue_.clear();
Modified: experiments/stephen-receptionist/intermediary/intermediary_controller.cc
==============================================================================
--- experiments/stephen-receptionist/intermediary/intermediary_controller.cc (original)
+++ experiments/stephen-receptionist/intermediary/intermediary_controller.cc Thu Jul 22 17:30:35 2010
@@ -20,7 +20,9 @@
#include <utility>
#include "communicator.h"
+#include "debug.h"
#include "intermediary_controller.h"
+#include "packet_counter.h"
#include "udp_buffer.h"
#include "utilities.h"
@@ -28,25 +30,44 @@
// Starts the intermediary task. Never returns.
void
IntermediaryController::run(Communicator& client_communicator,
- Communicator& contractor_communicator) {
+ Communicator& contractor_communicator)
+{
+ PacketCounter counter;
- for (;;) { // Forever
+ std::list<UdpBuffer> queue; // Packet queue...
+ std::list<UdpBuffer>::iterator li; // and its iterator
- // Receive the burst of packets, append the UDP information and
- // put them onto the message queue to the processor.
+ while (true) { // Forever
+
+ // Receive the burst of packets from the client and put them
+ // into a local queue.
for (int i = 0; i < burst_; ++i) {
- UdpBuffer data = client_communicator.receive();
- Utilities::AppendEndpoint(data);
- contractor_communicator.send(data);
+ Debug::log(counter.incrementReceive(), "Receiving data from client");
+ UdpBuffer buffer = client_communicator.receive();
+ queue.push_back(buffer);
}
- // Now read the packets off the incoming queue, extract the
- // endpoint information, and pass them back to the client.
+ // Forward them to the contractor via the message queue.
+ for (li = queue.begin(); li != queue.end(); ++li) {
+ Debug::log(counter.incrementSend(), "Sending data to contractor");
+ contractor_communicator.send(*li);
+ }
+ queue.clear();
+
+ // Now read the packets from the contractor and store in the queue.
for (int i = 0; i < burst_; ++i) {
- UdpBuffer data = contractor_communicator.receive();
- Utilities::ExtractEndpoint(data);
- client_communicator.send(data);
+ Debug::log(counter.incrementReceive2(),
+ "Receiving data from contractor");
+ UdpBuffer buffer = contractor_communicator.receive();
+ queue.push_back(buffer);
}
+
+ // ... and forward them back to the client.
+ for (li = queue.begin(); li != queue.end(); ++li) {
+ Debug::log(counter.incrementSend2(), "Sending data to client");
+ client_communicator.send(*li);
+ }
+ queue.clear();
}
return;
Modified: experiments/stephen-receptionist/receptionist/receptionist_controller.cc
==============================================================================
--- experiments/stephen-receptionist/receptionist/receptionist_controller.cc (original)
+++ experiments/stephen-receptionist/receptionist/receptionist_controller.cc Thu Jul 22 17:30:35 2010
@@ -20,25 +20,39 @@
#include <utility>
#include "communicator.h"
-#include "defaults.h"
+#include "debug.h"
+#include "packet_counter.h"
#include "receptionist_controller.h"
#include "udp_buffer.h"
#include "utilities.h"
-// Starts the receptionist task. Never returns.
+// Starts the receptionist task. This receives packets and holds them
+// until it has "burst" packets, then sends them on.
void
ReceptionistController::run(Communicator& client_communicator,
Communicator& processor_communicator) {
- // Just loop, forwarding packets to the worker.
+ PacketCounter counter;
- for (;;) { // Forever
+ std::list<UdpBuffer> queue;
+ while (true) { // Forever
- UdpBuffer data = client_communicator.receive();
- Utilities::AppendEndpoint(data);
- processor_communicator.send(data);
+ // Receive the packets.
+ for (int i = 0; i < burst_; ++i) {
+ Debug::log(counter.incrementReceive(), "Calling receive");
+ UdpBuffer data = client_communicator.receive();
+ queue.push_back(data);
+ }
+
+ // Send the packets onwards.
+ for (std::list<UdpBuffer>::iterator li = queue.begin();
+ li != queue.end(); ++li) {
+ Debug::log(counter.incrementSend(), "Calling send");
+ processor_communicator.send(*li);
+ }
+ queue.clear();
}
return;
Modified: experiments/stephen-receptionist/scripts/client-contractor.sh
==============================================================================
--- experiments/stephen-receptionist/scripts/client-contractor.sh (original)
+++ experiments/stephen-receptionist/scripts/client-contractor.sh Thu Jul 22 17:30:35 2010
@@ -20,29 +20,15 @@
# Runs a series of tests with varying parameters on the client-intermediary-
# contractor model. The output is written to the file given as the only
# parameter.
+#
+# \param -a If specified, run asynchronously
+# \param logfile Name of the logfile to which to write the results
progdir=`dirname $0`
-if [ $# != 1 ]; then
- echo "Usage: client-contractor logfile"
+if [ $# -lt 1 -o $# -gt 2 ]; then
+ echo "Usage: client-contractor [-a] logfile"
exit 1;
fi
-for burst in 1 2 4 8 16 32 64 96 128 160 192 224 256
-do
- echo "Setting burst to $burst"
- $progdir/intermediary --burst $burst &
- $progdir/contractor --burst $burst &
- sleep 3 # Allow worker to start
-
- for rpt in {1..64}
- do
- a="$progdir/client --count 4096 --burst $burst --pktsize 256 --logfile $1"
- echo $a
- $a
- done
-
- kill %?contractor
- kill %?intermediary
- sleep 3
-done
+$progdir/common.sh $* intermediary contractor
Modified: experiments/stephen-receptionist/scripts/client-server.sh
==============================================================================
--- experiments/stephen-receptionist/scripts/client-server.sh (original)
+++ experiments/stephen-receptionist/scripts/client-server.sh Thu Jul 22 17:30:35 2010
@@ -19,27 +19,15 @@
#
# Runs a series of tests with varying parameters on the client-server model.
# The output is written to the file given as the only parameter.
+#
+# \param -a If specified, run asynchronously
+# \param logfile Name of the logfile to which to write the results
progdir=`dirname $0`
-if [ $# != 1 ]; then
- echo "Usage: client-server logfile"
+if [ $# -lt 1 -o $# -gt 2 ]; then
+ echo "Usage: client-server [-a] logfile"
exit 1;
fi
-for burst in 1 2 4 8 16 32 64 96 128 160 192 224 256
-do
- echo "Setting burst to $burst"
- $progdir/server --burst $burst &
- sleep 3 # Allow worker to start
-
- for rpt in {1..64}
- do
- a="$progdir/client --count 4096 --burst $burst --pktsize 256 --logfile $1"
- echo $a
- $a
- done
-
- kill %?server
- sleep 3
-done
+$progdir/common.sh $* server
Modified: experiments/stephen-receptionist/scripts/client-worker.sh
==============================================================================
--- experiments/stephen-receptionist/scripts/client-worker.sh (original)
+++ experiments/stephen-receptionist/scripts/client-worker.sh Thu Jul 22 17:30:35 2010
@@ -19,29 +19,15 @@
#
# Runs a series of tests with varying parameters on the client-receptionist-
# worker model. The output is written to the file given as the only parameter.
+#
+# \param -a If specified, run asynchronously
+# \param logfile Name of the logfile to which to write the results
progdir=`dirname $0`
-if [ $# != 1 ]; then
- echo "Usage: client-worker logfile"
+if [ $# -lt 1 -o $# -gt 2 ]; then
+ echo "Usage: client-worker [-a] logfile"
exit 1;
fi
-for burst in 1 2 4 8 16 32 64 96 128 160 192 224 256
-do
- echo "Setting burst to $burst"
- $progdir/worker --burst $burst &
- $progdir/receptionist --burst $burst &
- sleep 3 # Allow worker to start
-
- for rpt in {1..64}
- do
- a="$progdir/client --count 4096 --burst $burst --pktsize 256 --logfile $1"
- echo $a
- $a
- done
-
- kill %?receptionist
- kill %?worker
- sleep 3
-done
+$progdir/common.sh $* receptionist worker
Modified: experiments/stephen-receptionist/server/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/server/Makefile.am (original)
+++ experiments/stephen-receptionist/server/Makefile.am Thu Jul 22 17:30:35 2010
@@ -7,4 +7,3 @@
server_LDADD = $(top_srcdir)/common/libcommon.a
server_SOURCES = server.cc
-server_SOURCES += server_controller.cc
Modified: experiments/stephen-receptionist/server/server.cc
==============================================================================
--- experiments/stephen-receptionist/server/server.cc (original)
+++ experiments/stephen-receptionist/server/server.cc Thu Jul 22 17:30:35 2010
@@ -36,7 +36,7 @@
#include <iomanip>
#include "target_command.h"
-#include "server_controller.h"
+#include "burst_server_controller.h"
#include "udp_communicator.h"
#include "exception.h"
@@ -57,7 +57,7 @@
communicator.open();
// Create the task controller.
- ServerController controller(command.getBurst());
+ BurstServerController controller(command.getBurst());
// ... and enter the run loop.
Modified: experiments/stephen-receptionist/server/server_controller.cc
==============================================================================
--- experiments/stephen-receptionist/server/server_controller.cc (original)
+++ experiments/stephen-receptionist/server/server_controller.cc Thu Jul 22 17:30:35 2010
@@ -19,39 +19,40 @@
#include <list>
#include <utility>
-#include "defaults.h"
-#include "utilities.h"
#include "communicator.h"
+#include "debug.h"
+#include "packet_counter.h"
#include "server_controller.h"
#include "udp_buffer.h"
+#include "utilities.h"
// Starts the server task. Never returns.
void
ServerController::run(Communicator& downstream_communicator,
- Communicator& upstream_communicator) {
+ Communicator& upstream_communicator)
+{
+ PacketCounter counter;
- int num = 0;
- for (;;) { // Forever
+ while (true) { // Forever
// Receive the burst of packets and put them onto the queue
for (int i = 0; i < burst_; ++i) {
+ Debug::log(counter.incrementReceive(), "Calling receive");
UdpBuffer buffer = downstream_communicator.receive();
queue_.push_back(buffer);
}
- // Calculate the checksums.
+ // Calculate the CRC and put in the packet.
std::list<UdpBuffer>::iterator li;
for (li = queue_.begin(); li != queue_.end(); ++li) {
- if ((li->capacity - li->size) >= 4) {
- Utilities::Crc((li->data).get(), li->size);
- li->size += 4;
- }
+ uint32_t crc = Utilities::Crc(li->data(), li->dataSize());
+ li->setCrc(crc);
}
- // Now return the packets back to the sender after appending a
- // checksum to each.
+ // Now return the packets back to the sender
for (li = queue_.begin(); li != queue_.end(); ++li) {
+ Debug::log(counter.incrementSend(), "Calling send");
downstream_communicator.send(*li);
}
queue_.clear();
Modified: experiments/stephen-receptionist/server/server_controller.h
==============================================================================
--- experiments/stephen-receptionist/server/server_controller.h (original)
+++ experiments/stephen-receptionist/server/server_controller.h Thu Jul 22 17:30:35 2010
@@ -23,6 +23,7 @@
#include "controller.h"
#include "communicator.h"
+#include "udp_buffer.h"
/// \brief Server Controller
///
Modified: experiments/stephen-receptionist/worker/Makefile.am
==============================================================================
--- experiments/stephen-receptionist/worker/Makefile.am (original)
+++ experiments/stephen-receptionist/worker/Makefile.am Thu Jul 22 17:30:35 2010
@@ -7,4 +7,3 @@
worker_LDADD = $(top_srcdir)/common/libcommon.a
worker_SOURCES = worker.cc
-worker_SOURCES += worker_controller.cc
Modified: experiments/stephen-receptionist/worker/worker.cc
==============================================================================
--- experiments/stephen-receptionist/worker/worker.cc (original)
+++ experiments/stephen-receptionist/worker/worker.cc Thu Jul 22 17:30:35 2010
@@ -43,7 +43,7 @@
#include "target_command.h"
#include "msgq_communicator.h"
#include "udp_communicator.h"
-#include "worker_controller.h"
+#include "burst_server_controller.h"
#include "exception.h"
#include <string.h>
@@ -69,7 +69,7 @@
client_communicator.open();
// Create the task controller.
- WorkerController controller(command.getBurst());
+ BurstServerController controller(command.getBurst());
// ... and enter the run loop.
Modified: experiments/stephen-receptionist/worker/worker_controller.cc
==============================================================================
--- experiments/stephen-receptionist/worker/worker_controller.cc (original)
+++ experiments/stephen-receptionist/worker/worker_controller.cc Thu Jul 22 17:30:35 2010
@@ -20,7 +20,9 @@
#include <utility>
#include "communicator.h"
+#include "debug.h"
#include "defaults.h"
+#include "packet_counter.h"
#include "udp_buffer.h"
#include "utilities.h"
#include "worker_controller.h"
@@ -32,30 +34,27 @@
WorkerController::run(Communicator& receptionist_communicator,
Communicator& client_communicator)
{
- int num = 0;
+ PacketCounter counter;
+
for (;;) { // Forever
// Receive the burst of packets and put them onto the queue
for (int i = 0; i < burst_; ++i) {
+ Debug::log(counter.incrementReceive(), "Calling receive");
UdpBuffer buffer = receptionist_communicator.receive();
queue_.push_back(buffer);
}
- // Calculate the CRCs. Since the packets include UDP endpoint
- // information, we extract it before doing the calculation.
+ // Calculate the CRCs.
std::list<UdpBuffer>::iterator li;
for (li = queue_.begin(); li != queue_.end(); ++li) {
-
- // Remove UDP endpoint information and store in buffer
- Utilities::ExtractEndpoint(*li);
-
- // Calculate CRC
- Utilities::Crc((li->data).get(), li->size);
- li->size += 4;
+ uint32_t crc = Utilities::Crc(li->data(), li->dataSize());
+ li->setCrc(crc);
}
// Now return the packets back to the client.
for (li = queue_.begin(); li != queue_.end(); ++li) {
+ Debug::log(counter.incrementSend(), "Calling send");
client_communicator.send(*li);
}
queue_.clear();
More information about the bind10-changes
mailing list