Table of Contents
List of Examples
MRNet is a customizable, high-throughput communication software system for parallel tools and applications with a master/slave architecture. MRNet reduces the cost of these tools' activities by incorporating a tree of processes between the tool's front-end and back-ends. MRNet uses these internal processes to distribute many important tool activities, reducing analysis time and keeping tool front-end loads manageable.
MRNet-based tools send data between front-end and back-ends on logical flows of data called streams. MRNet internal processes use filters to synchronize and aggregate data sent to the tool's front-end. Using filters to manipulate data in parallel as it passes through the network, MRNet can efficiently compute averages, sums, and other more complex aggregations on back-end data.
Several features make MRNet especially well-suited as a general facility for building scalable parallel tools:
Table of Contents
For this discussion, $MRNET_ROOT
is the location of the
top-level directory of the MRNet distribution and
$MRNET_ARCH
is a string describing the platform (OS and
architecture) as discovered by autoconf. For the installation instructions, it is
assumed that the current working directory is $MRNET_ROOT
.
MRNet has been developed to be highly portable; we expect it to run properly on all common Unix-based as well as Microsoft Windows platforms. This being said, we have successfully built and tested MRNet on the following systems:
Our build system attempts to use native system compilers where appropriate, for instance, xlc and xlC in AIX environments.
Here we list the third party tools that MRNet uses and needs for proper installation:
MRNet uses GNU autoconf to discover the platform specific configuration parameters. The script that does this auto-configuration is called configure.
UNIX>
./configure --help
shows all possible options of the command. Below, we display the MRNet-specific ones:
--with-libfldir Directory containing flex library
./configure without any options should give reasonable results, but the user may specify certain options. For example,
UNIX>
./configure CXX=g++ CXXFLAGS=-O3 --with-libfldir=/usr/local/lib
instructs the configure script to use g++ for the C++
compiler with level 3 optimization and /usr/local/lib/libfl.a
as the location of the flex library.
To build the MRNet toolkit by type:
UNIX>
make
After a successful build, the following files will be present:
$MRNET_ROOT/lib/$MRNET_ARCH/libmrnet.a
: MRNet
API library$MRNET_ROOT/lib/$MRNET_ARCH/libxplat.a
: A library that exports platform dependent routines to MRNet$MRNET_ROOT/bin/$MRNET_ARCH/mrnet_commnode
: MRNet
internal communcation nodeTyping:
UNIX>
make mrnet-tests
builds the mrnet test files. In addition to those files above, you will also generate:
$MRNET_ROOT/bin/$MRNET_ARCH/mrnet_topgen
: MRNet
topology file generator$MRNET_ROOT/bin/$MRNET_ARCH/*_[FE,BE]
: MRNet test front-end and back-end programs$MRNET_ROOT/lib/$MRNET_ARCH/test_DynamicFilters.so
: Shared object used in tests of dynamic filter loading.$MRNET_ROOT/bin/$MRNET_ARCH/mrnet_tests.sh
: A shell script that runs the test programs and checks for errors in an automated fashion.
UNIX>
mrnet_tests.sh [ -l | -r <hostfile> | -a <hostfile> ] [ -f <sharedobject> ]
-l
flag is used to run all tests using only topologies that create processes on the local machine. The -r
flag runs tests using remote machines specified in the file whose name immediately follows this flag. To run test both locally and remotely, use the -a
flag and specify a hostfile to use. To run the programs that test MRNet's ability to dynamically load filters, you must specify the absolute location of the shared object test_DynamicFilters.so
produced when the tests were built.
$PATH
. For testing dynamic filters, the filesystem containing the shared object must be available to all the host machines participating in the test.Table of Contents
The MRNet distribution has two main components: libmrnet.a
, a
library that is linked into a tool's front-end and back-end components, and
mrnet_commnode, a program that runs on intermediate nodes
interposed between the application front-end and back-ends. libmrnet.a
exports an API (See Chapter 5, The MRNet C++ API Reference) that enables
I/O interaction between the front-end and groups of back-ends via MRNet. The
primary purpose of mrnet_commnode is to distribute data
processing functionality across multiple computer hosts and to implement
efficient and scalable group communications. The following sub-sections describe
the lower-level components of the MRNet API in more detail.
An MRNet end-point represents a tool or application process or node. In particular, they represent the back-end processes in the system. The front-end can communicate in a unicast or multicast fashion with one or more of these end-points as described below.
MRNet uses communicators to represent groups of network end-points. Like communicators in MPI, MRNet communicators provide a handle that identifies a set of end-points for point-to-point, multicast or broadcast communications. MPI applications typically have a non-hierarchical layout of potentially identical processes. In contrast, MRNet enforces a tree-like layout of all processes, rooted at the tool front-end. Accordingly, MRNet communicators are created and managed by the front-end, and communication is only allowed between a tool's front-end and its back-ends, i.e. back-ends cannot interact with each other directly via MRNet.
A stream is a logical channel that connects the front-end to the end-points of a communicator. All tool-level communication via MRNet must use these streams. Streams carry data packets downstream, from the front-end toward the back-ends, and upstream, from the back-ends toward the front-end. Upward streams are expected to carry data of a specific type allowing data aggregation operations to be associated with a stream. The type is specified using a format string (See Appendix A, MRNET Format Strings) similar to those used in C formatted I/O primitives, e.g. a packet whose data is described by the format string "%d %d %f %s" contains two integers followed by a float then a character string. MRNet expands the standard specification to allow for specifiers that describe arrays of integers and floats.
Data Aggregation is the process of merging multiple input data packets and transforming them into one or more output packets. Though it is not necessary for the aggregation to result in less or even different data, aggregations that reduce or modify data values are most common. MRNet uses data filters to aggregate data packets. Filters specify an operation to perform and the type of the data expected on the bound stream. Filter instances are bound to a stream at stream creation. MRNet uses two types of filters: synchronization filters and transformation filters. Synchronization filters organize data packets from downstream nodes into synchronized waves of data packets, while transformation filters operate on the synchronized data packets yielding one or more output packets. A distinction between synchronization and transformation filters is that synchronization filters are independent of the packet data type, but transformation filters operate on packets of a specific type.
Synchronization filters operate on data flowing upstream in the network, receiving packets one at a time and outputting packets only when the specified synchronization criteria has occurred. Synchronization filters provide a mechanism to deal with the asynchronous arrival of packets from children nodes; the synchronizer collects packets and typically aligns them into waves, passing an entire wave onward at the same time. Therefore, synchronization filters do no data transformation and can operate on packets in a type-independent fashion. MRNet currently supports three synchronization modes:
Transformation filters can be used on both upstream and downstream data flows. Transformation filters input a group of synchronized packets, and combine data from multiple packets by performing an aggregation that yields one or more new data packets. Since transformation filters are expected to perform computational operations on data packets, there is a type requirement for the data packets to be passed to this type of filter: the data format string of the stream's packets and the filter must be the same. Transformation operations must be synchronous, but can carry state from one transformation to the next using static storage structures. MRNet provides several transformation filters that should be of general use:
Chapter 7, Adding New Filters describes facilities a tool developer may use to add new filters to the provided set.
Table of Contents
A complete description of the MRNet API is in Chapter 5, The MRNet C++ API Reference. This section offers a brief overview only.
Using libmrnet.a
, a tool can leverage a system of internal
processes, instances of the mrnet_commnode
program, as a
communication substrate. After instantiation of the MRNet network (discussed in
the section called “MRNet Instantiation”, the front-end and back-end
processes are connected by the internal processes. The connection topology and
host assignment of these processes is determined by a configuration file, thus
the
geometry of MRNet's process tree can be customized to suit the physical topology
of the underlying hardware resources. While MRNet can generate a variety of
standard topologies, users can easily specify their own topologies; see Chapter 6, MRNET Process-tree
Topologies for further discussion.
The MRNet API contains network, end-point, communicator, and stream objects that a tool's front-end and back-end use for communication. The network object is used to instantiate the MRNet network and access end-point objects that represent available tool back-ends. The communicator object is a container for groups of end-points, and streams are used to send data to the end-points in a communicator.
Example 4.1. MRNet Front-end Sample Code
front_end_main(...) { 1. Network * net; 2. Communicator * comm; 3. Stream * stream; 4. PacketPtr packet; 5. int tag = FirstApplicationTag; 6. float result; 7. net = new Network(topol_config_file, backend_exe, backend_argv); 8. comm = net->get_BroadcastCommunicator( ); 9. stream = net->new_Stream(comm, TFILTER_MAX); 10. stream->send(tag, "%d", FLOAT_MAX_INIT); 11. stream->recv(&tag, packet) 12. packet->unpack("%f", &result); }
A simplified version of code from an example tool front-end is shown in Example 4.1, “MRNet Front-end Sample Code”. In the front-end code, after some variable definitions in lines 1-6, an instance of the MRNet network is created on line 7 using the topology specification in topol_config_file. In line 8, the newly created network object is queried for an auto-generated broadcast communicator that contains all available end-points. In line 9, this communicator is used to establish a stream for which the MRNet internal processes will use a built-in filter that finds the maximum value of the data sent upstream. The front-end then sends one or more initialization messages to the backends; in our example code on line 10, we broadcast an integer initializer on the new stream. The tag parameter is an application-specific value denoting the nature of the message being transmitted. After the send operation, the front-end performs a blocking stream receive at line 11. This call returns a tag and a packet. Finally, line 12 calls unpack to deserialize the floating point value contained in packet.
Example 4.2. MRNet Back-end Sample Code
back_end_main(int argc, char** argv) { 1. Stream * stream; 2. PacketPtr packet; 3. int val, tag; 4. float random_float = (float) random( ); 5. Network * net = new Network(argc,argv); 6. net->recv(&tag, packet, &stream); 7. packet->unpack("%d", &val ); 8. if( val == FLOAT_MAX_INIT ) 9. stream->send(tag, "%f", random_float); }
Example 4.2, “MRNet Back-end Sample Code” shows the code for the back-end that reciprocates the actions of the front-end. Each tool back-end first connects to the appropriate internal process in line 5, using the back-end version of the network constructor that receives its arguments via the program argument vector (argc/argv). While the front-end makes a stream-specific receive call, the back-ends use a stream-anonymous network receive that returns the tag sent by the front-end, the packet containing the actual data sent, and a stream object representing the stream that the front-end has established. Finally, each back-end sends a scalar floating point value upstream toward the front-end.
A complete example of MRNet code can be found below in Appendix B, A Complete MRNet Example: Integer Addition.
While conceptually simple, creating and connecting the internal processes is complicated by interactions with the various job scheduling systems. In the simplest environments, we can launch jobs manually using facilities like rsh or ssh. In more complex environments, it is necessary to submit all requests to a job management system. In this case, we are constrained by the operations provided by the job manager (and these vary from system to system). We currently support two modes of instantiating MRNet-based tools.
In the first mode of process instantiation, MRNet creates the internal and back-end processes, using the specified MRNet topology configuration to determine the hosts on which the components should be located. First, the front-end consults the configuration and uses rsh or ssh to create internal processes for the first level of the communication tree on the appropriate hosts. Upon instantiation, the newly created processes establish a network connection to the process that created it. The first activity on this connection is a message from parent to child containing the portion of the configuration relevant to that child. The child then uses this information to begin instantiation of the sub-tree rooted at that child. When a sub-tree has been established, the root of that sub-tree sends a report to its parent containing the end-points accessible via that sub-tree. Each internal node establishes its children processes and their respective connections sequentially. However, since the various processes are expected to run on different compute nodes, sub-trees in different branches of the network are created concurrently, maximizing the efficiency of network instantiation.
In the second mode of process instantiation, MRNet relies on a process management system to create some or all of the MRNet processes. This mode accommodates tools that require their back-ends to create, monitor, and control other processes. For example, IBM's POE uses environment variables to pass information, such as the process' rank within the application's global MPI communicator, to the MPI run-time library in each application process. In cases like this, MRNet cannot provide back-end processes with the environment necessary to start MPI application processes. As a result, MRNet creates its internal processes recursively as in the first instantiation mode, but does not instantiate any back-end processes. MRNet then starts the tool back-ends using the process management system to ensure they have the environment needed to create application processes successfully. When starting the back-ends, the front-end must provide them with the information needed to connect to the MRNet internal processes, such as the leaf processes' host names and connection port numbers. This information can be provided via the environment, using shared filesystems or other information services as available on the target system. To collect the necessary information, the front-end can use the MRNet API methods for discovering the network topology details.
All classes are included in the MRN
namespace.
For this discussion, we do not explicitly include reference to the
namespace; for example, when we reference the class Network
, we are
implying the class MRN::Network
.
In MRNet, there are five top-level classes: Network
,
NetworkTopology
, Communicator
,
Stream
, and Packet
.
The Network class primarily contains methods for instantiating and destroying MRNet
process trees. The NetworkTopology class represents
the interface for discovering details about the topology of an instantiated Network.
Application back-ends are referred to as end-points, and the Communicator
class is used to reference a group of end-points. A Communicator is used to
establish a Stream for unicast, multicast, or broadcast communications
via the MRNet infrastructure. The Packet class encapsulates the data packets
that are sent on a Stream. The public members of these classes are detailed below.
void Network::Network( | topology, | |
backend_exe, | ||
backend_argv, | ||
rank_backends, | ||
using_memory_buffer) ; |
const char * | topology; |
const char * | backend_exe; |
const char ** | backend_argv; |
bool | rank_backends =true; |
bool | using_memory_buffer =false; |
The front-end constructor method that is used to instantiate the MRNet process tree.
topology
is the path to a configuration file that describes the desired process tree topology.backend_exe
is the path to the executable to be used for the application's back-end processes.backend_argv
is a null terminated list of arguments to pass to the back-end application upon creation.rank_backends
indicates whether the back-end process ranks should begin at 0, similar to MPI rank numbering, and defaults to true. Ifusing_memory_buffer
is set to true (default is false), thetopology
parameter is actually a pointer to a memory buffer containing the topology specification, rather than the name of a file.When this function completes without error, all MRNet processes specified in the topology will have been instantiated.
Note
Ifbackend_exe
is NULL, no back-end processes will be started, and the leaves of the topology specified bytopology
will be instances of mrnet_commnode.
Note
When starting internal and back-end processes, MRNet will use ssh to start remote processes unless the environment variableXPLAT_RSH
is set to a different command. If it is necessary to run the remote process starter command (e.g., rsh) with a utility like runauth to non-interactively authenticate the unattended remote process, that command may be specified using theXPLAT_REMCMD
environment variable.
void Network::Network( | argc, | |
argv) ; |
int | argc; |
char ** | argv; |
The back-end constructor method that is used when the process is started due to a
front-end Network instantiation. MRNet automatically passes the necessary
information to the process using the program argument vector
(argc/argv
) by inserting it after the user-specified arguments.
void Network::Network( | parent_hostname, | |
parent_port, | ||
parent_rank, | ||
my_hostname, | ||
my_rank) ; |
const char * | parent_hostname; |
Port | parent_port; |
Rank | parent_rank; |
const char * | my_hostname; |
Rank | my_rank; |
The back-end constructor method that is used to attach to an instantiated MRNet process tree, as is necessary when the back-end processes are not started as part of a front-end Network instantiation.parent_hostname
is the name of the host where the parent process is running.parent_port
andparent_rank
are the port number and rank of the parent process, respectively. Information about the tree processes to which back-ends should connect can be gathered by the front-end using the NetworkTopology object returned fromNetwork::get_NetworkTopology
.my_hostname
is the name of the host on which the back-end process is running, andmy_rank
is an arbitrary rank chosen by the back-end to not conflict with the ranks of existing tree processes.
void Network::~Network(
)
;
Network::~Network
is used to tear down the MRNet process tree and clean up the Network object. The first action taken by the destructor is to invokeNetwork::shutdown_Network
.
void Network::shutdown_Network(
)
;
Network::shutdown_Network
is used to tear down the MRNet process tree. When this function is called, each node in the tree sends a control message to its immediate children informing them of the "shutdown network" request, and waits for confirmation. If the node is an internal process (i.e., mrnet_commnode), the process will then terminate. If the node is an application back-end, the process will terminate unless a separate call toNetwork::set_TerminateBackEndsOnShutdown
has been made to request otherwise.
void Network::set_TerminateBackEndsOnShutdown( | terminate) ; |
bool | terminate; |
Network::set_TerminateBackEndsOnShutdown
is used to control whether application back-end processes are terminated when the MRNet Network is shutdown. By default, back-end processes will be terminated. If this is not desirable, call this method withterminate
set to false.
int Network::recv( | tag, | |
packet, | ||
stream, | ||
blocking) ; |
int * | tag; |
PacketPtr & | packet; |
Stream ** | stream; |
bool | blocking =true; |
Network::recv
is used to invoke a stream-anonymous receive operation. Any packet available (addressed to any stream) will be returned (in roughly FIFO ordering) via the output parameters.otag
will be filled in with the integer tag value that was passed by the correspondingStream::send()
operation.packet
is the packet that was received. A pointer to the Stream to which the packet was addressed will be returned instream
.blocking
is used to signal whether this call should block or return if data is not immediately available; it defaults to a blocking call. A return value of -1 indicates an error, 0 indicates no packets were available, and 1 indicates success.
int Network::load_FilterFunc( | so_file, | |
func_name, | ||
is_transformation_filter) ; |
const char * | so_file; |
const char * | func_name; |
bool | is_transformation_filter =true; |
This method, used for loading new filter operations into the Network is conveniently similar to the conventional
dlopen()
facilities for opening a shared object and dynamically loading symbols defined within.so_file
is the path to a shared object file that contains the filter function to be loaded andfunc_name
is the name of the function to be loaded. The last parameteris_transformation_filter
defaults to true and can usually be omitted since the common case is to load transformation, not synchronization, filters.On success,
Network::load_FilterFunc
returns the id of the newly loaded filter which may be used in subsequent calls toNetwork::new_Stream
. A value of -1 is returned on failure.
void Network::print_error( | error_msg) ; |
const char * | error_msg; |
Network::print_error
prints a message to stderr describing the last error encountered during a MRNet library call. It first prints the null-terminated string,error_msg
followed by a colon then the actual error message followed by a newline.
void Network::get_DataSocketFds( | fd_array, | |
fd_array_size) ; |
int ** | fd_array; |
unsigned int * | fd_array_size; |
Network::get_DataSocketFds
is used to notify an application of all the file descriptors MRNet is using for data communication. This function returns an array of sizefd_array_size
file descriptors in the output arrayfd_array
. On front-ends, the array should contain an entry for each child, while on back-ends the array should contain a single entry for the parent.
Instances of NetworkTopology are network specific, so they are created when a Network is instantiated. MRNet API users should not need to create their own NetworkTopology instances.
NetworkTopology * Network::get_NetworkTopology( | ) ; |
Network::get_NetworkTopology
is used to retrieve
a pointer to the underlying NetworkTopology instance of a Network.
unsigned int NetworkTopology::get_NumNodes( | ) ; |
This function returns the total number of nodes in the tree topology, including front-end, internal, and back-end processes.
NetworkTopology::Node * NetworkTopology::find_Node( | node_rank) ; |
Rank | node_rank; |
This function returns a pointer to the tree node with rank equal to node_rank
,
or NULL if not found.
NetworkTopology::Node * NetworkTopology::get_Root( | ) ; |
This function returns a pointer to the root node of the tree, or NULL if not found.
void NetworkTopology::get_Leaves( | leaves) ; |
std::vector< NetworkTopology::Node * > & | leaves; |
This function fills in the leaves
vector with pointers to the
leaf nodes in the topology. In the case where back-end processes are not
started when the Network is instantiated, a front-end process can use this
function to retrieve information about the leaf internal processes to which
the back-ends should attach.
std::set< NetworkTopology::Node * > NetworkTopology::get_BackEndNodes( | ) ; |
This function returns a set containing pointers to all back-end process tree nodes.
std::set< NetworkTopology::Node * > NetworkTopology::get_ParentNodes( | ) ; |
This function returns a set containing pointers to all tree nodes that are parents (i.e., those nodes having at least one child).
std::set< NetworkTopology::Node * > NetworkTopology::get_OrphanNodes( | ) ; |
This function returns a set containing pointers to all tree nodes that have no parent due to a failure.
void NetworkTopology::get_TreeStatistics( | num_nodes, | |
depth, | ||
min_fanout, | ||
max_fanout, | ||
avg_fanout, | ||
stddev_fanout) ; |
unsigned int & | num_nodes; |
unsigned int & | depth; |
unsigned int & | min_fanout; |
unsigned int & | max_fanout; |
double & | avg_fanout; |
double & | stddev_fanout; |
This function fills in the values of each of the parameters.num_nodes
is the total number of tree nodes (same as the value returned byNetworkTopology::get_NumNodes
),depth
is the depth of the tree (i.e., the maximum path length from root to any leaf),min_fanout
is the minimum number of children of any parent node,max_fanout
is the maximum number of children of any parent node,avg_fanout
is the average number of children across all parent nodes, andstddev_fanout
is the standard deviation in number of children across all parent nodes.
std::string NetworkTopology::Node::get_HostName( | ) ; |
This function returns a character string identifying the hostname of the tree node.
Port NetworkTopology::Node::get_Port(
)
;
This function returns the connection port of the tree node.
Rank NetworkTopology::Node::get_Rank(
)
;
This function returns the unique rank of the tree node.
const std::set< NetworkTopology::Node * > & NetworkTopology::Node::get_Children( | ) ; |
This function returns a set containing pointers to the children of the tree node, and is useful for navigating through the tree.
unsigned int NetworkTopology::Node::get_NumChildren( | ) ; |
This function returns the number of children of the tree node.
unsigned int NetworkTopology::Node::find_SubTreeHeight( | ) ; |
This function returns the height of the subtree rooted at this tree node.
Instances of Communicator are network specific, so their creation methods are functions of an instantiated Network object.
Communicator * Network::new_Communicator(
)
;
This function returns a pointer to a new Communicator object. The object
initially contains no end-points. Use
Communicator::add_EndPoint
to populate the Communicator.
Communicator * Network::new_Communicator( | comm) ; |
Communicator & | comm; |
This function returns a pointer to a new Communicator object that initially
contains the set of end-points contained in comm
.
Communicator * Network::new_Communicator( | endpoints) ; |
std::set< CommunicationNode * > & | endpoints; |
This function returns a pointer to a new Communicator object that initially
contains the set of end-points contained in endpoints
.
Communicator * Network::get_BroadcastCommunicator( | ) ; |
This function returns a pointer to a default broadcast Communicator containing all the end-points available in the system at the time the function is called. Multiple calls to this function return the same pointer to the broadcast communicator object created at network instantiation. If the Network's topology changes, as can occur when starting back-ends separately, the object will be updated to reflect the additions or deletions. This object should not be deleted.
bool Communicator::add_EndPoint( | ep_rank) ; |
Rank | ep_rank; |
This function is used to add an existing end-point with rankep_rank
to the set contained by the Communicator. The original set of end-points contained by the Communicator is tested to see if it already contains the potentially new end-point. If so, the function silently returns successfully. This function fails if there exists no end-point defined byep_rank
. This function returns true on success, false on failure.
bool Communicator::add_EndPoint( | endpoint) ; |
CommunicationNode * | endpoint; |
This function is similar to the add_EndPoint()
above except that it takes a pointer to a CommunicationNode object
instead of a rank. Success and failure conditions are exactly as stated above.
This function also returns true on success and false on failure.
const std::set< CommunicationNode * > & Communicator::get_EndPoints( | ) ; |
This function returns a reference to the set of CommunicationNode pointers comprising the end-points in the Communicator.
std::string CommunicationNode::get_HostName( | ) ; |
This function returns a character string identifying the hostname of the end-point represented by this CommunicationNode. is out of range.
Port CommunicationNode::get_Port(
)
;
This function returns the connection port of the end-point represented by this CommunicationNode.
Rank CommunicationNode::get_Rank(
)
;
This function returns the unique rank of the end-point represented by this CommunicationNode.
Instances of Stream are network specific, so their creation methods are functions of an instantiated Network object.
Stream * Network::new_Stream( | comm, | |
up_transfilter_id, | ||
up_syncfilter_id, | ||
down_transfilter_id) ; |
Communicator * | comm; |
int | up_transfilter_id =TFILTER_NULL; |
int | up_syncfilter_id =SFILTER_WAITFORALL; |
int | down_transfilter_id =TFILTER_NULL; |
Network::new_Stream()
creates a Stream object attached to the end-points specified by thecomm
argument. The second argumentup_transfilter_id
specifies the transformation filter to apply to data flowing upstream from the application back-ends toward the front-end; the default value is "Null Filter".up_syncfilter_id
specifies the synchronization filter to apply to upstream packets; the default value is "Wait-for-all".down_transfilter_id
allows the user to specify a filter to apply to downstream data flows; the default value is "Null Filter".
Stream * Network::get_Stream( | iid) ; |
unsigned int | iid; |
Network::get_Stream()
returns a pointer to the Stream identified byid
, or NULL on failure.
unsigned int Stream::get_Id(
)
;
This function returns the integer identifier for this Stream.
const std::set< Rank > & Stream::get_EndPoints( | ) ; |
This function returns the set of end-point ranks for this Stream.
unsigned int Stream::size(
)
;
This function returns an integer indicating the number of end-points for this Stream.
int Stream::send( | tag, | |
format_string, | ||
...) ; |
int | tag; |
const char * | format_string; |
This function invokes a data send operation on the calling Stream.tag
is an integer identifier that is expected to classify the data in the packet to be transmitted across the Stream.format_string
is a format string describing the data in the packet (See Appendix A, MRNET Format Strings for a full description.) On success,Stream::send()
returns 0; on failure -1.Note
tag
must have a value greather than or equal to the constant "FirstApplicationTag" defined by MRNet. Tag values less than "FirstApplicationTag" are reserved for internal MRNet use.
int Stream::recv( | tag, | |
packet, | ||
blocking) ; |
int * | tag; |
PacketPtr & | packet; |
bool | blocking =true; |
Stream::recv()
invokes a stream-specific receive operation. Packets addressed to the calling stream will be returned in strictly FIFO ordering via the output parameters.tag
will be filled in with the integer tag value that was passed by the correspondingStream::send()
operation.packet
is the recieved Packet.blocking
determines whether the receive should block or return if data is not immediately available; it defaults to a blocking call. A return value of -1 indicates an error, 0 indicates no packets were available, and 1 indicates success.
int Stream::flush(
)
;
This function commits a flush of all packets currently buffered by the stream pending an output operation. A successful return indicates that all packets on the calling stream have been passed to the operating system for network transmission.
int Stream::set_FilterParameters( | upstream, | |
format_string, | ||
...) ; |
bool | upstream; |
const char * | format_string; |
Stream::set_FilterParameters
allows users to dynamically configure the operation of a Stream transformation filter by passing arbitrary data in a similar fashion toStream::send
. When the filter executes, the passed data is available as a PacketPtr parameter to the filter, and the filter can extract the configuration settings. When set to true,upstream
indicates the upstream transformation filter should be updated, while a value of false will update the downstream transformation filter.
A Packet encapsulates a chunk of formatted data sent on a Stream.
Packets are created using a format string (e.g.,
"%s %d"
describes a null-terminated string followed
by a 32-bit integer, and the Packet is said to contain 2 data elements).
MRNet front-end and back-end processes do not create instances of Packet;
instead they are automatically produced from the
formatted data passed to Stream::send
.
Appendix A, MRNET Format Strings contains the full listing of data types
that can be sent in a Packet.
When receiving a Packet via Stream::recv
or
Network::recv
,
the Packet instance is stored within a PacketPtr object. PacketPtr is a class
based on the Boost library shared_ptr class, and helps with memory management
of Packets. A PacketPtr can be assumed to be equivalent to "Packet *", and
all operations on Packets require use of PacketPtr.
int Packet::get_Tag(
)
;
This function returns the integer tag associated with the Packet.
unsigned short Packet::get_StreamId(
)
;
This function returns the stream id associated with the Packet.
const char * Packet::get_FormatString(
)
;
This function returns the character string specifying the data format of the Packet.
void Packet::unpack( | format_string, | |
...) ; |
const char * | format_string; |
This function extracts data contained within a Packet according to theformat_string
, which must match that of the Packet. The function arguments followingformat_string
should be pointers to the appropriate types of each data item. For string and array data types, new memory buffers to hold the data will be allocated usingmalloc()
, and it is the user's responsibility tofree()
these strings and arrays.
void Packet::set_DestroyData(
destroy, ...)
;
bool destroy
;
This function can be used to tell MRNet whether or not to deallocate the string and array data members of a Packet. Ifdestroy
is true, string and array data members will be deallocated usingfree()
when the Packet destructor is executed. Note this assumes they were allocated usingmalloc()
. The default behavior for user-generated Packets is not to deallocate (false). Turning on deallocation is useful in filter code that must allocate strings or arrays for output Packets, which cannot be freed before the filter function returns.
Table of Contents
MRNet allows a tool to specify a node allocation and process connectivity tailored to its computation and communication requirements and to the system where the tool will run. Choosing an appropriate MRNet configuration can be difficult due to the complexity of the tool's own activity and its interaction with the system. This section describes how users define their own process topologies, and the mrnet_topgen utility provided by MRNet to facilitate the process.
The first parameter to the Network::Network()
front-end constructor is the name of an MRNet topology file. This file defines the
topological layout of the front-end, internal nodes, and back-end MRNet processes.
In the syntax of the topology file, the
hostname:id
tuple represents a process with a MRNet instance
id
running on hostname
. It is important
to note that the id
is of symbolic value only and does not
reflect a port or process number associated with the system. A line in the
topology file is always of the form:
hostname1:0 => hostname1:1 hostname1:2 ;
meaning a process on hostname1
with MRNet id 0
has two children, with MRNet ids 1 and 2, running on the
same host. MRNet will parse the topology file without error if the file
properly defines a tree in the mathematical sense (i.e. a tree must have
a single root, no cycles, full connection, and no node can be its own
descendant).
hostname1:0 => hostname1:1 hostname1:2 ;
When the MRNet test programs are built, a topology generator program,
$MRNET_ROOT/bin/$MRNET_ARCH/mrnet_topgen
, will also
be created. The usage of this program is:
mrnet_topgen <OPTION> [INFILE] [OUTFILE] Create a MRNet topology specification from machine list in INFILE or standard input, and writes output to OUTFILE or standard output. -b topology, --balanced=topology Create a balanced tree using "topology" specification. The specification is in the format F^D, where F is the fan-out (or out-degree) and D is the tree depth. The number of tree leaves (or back-ends) will be F^D. Example: "16^3" means a tree of depth 3 with fan-out 16, with 4096 leaves. -o topology, --other=topology Create a generic tree using "topology" specification. The specification for this option is (the agreeably complicated) N:N,N,N:... where N specifies the number of children a node has, ',' distinguishes nodes on the same level, and ':' separates the tree into levels. Example 1: "2:2,2" specifies a tree where the root has 2 children and each child on the 2nd level has 2 children. Example 2: "2:8,4" specifies a tree where the root has 2 children. At the 2nd level, the 1st child has 8 children, and the 2nd child has 4 children
The specified input machine list must contain enough hosts to support the entire process tree. mrnet_topgen assumes one process will be placed on each host. To place multiple processes on the same host, the list should contain the host's name multiple times.
A filter function has the following signature
void filter_name( | packets_in, | |
packets_out, | ||
packets_out_reverse, | ||
local_storage, | ||
config_params) ; |
std::vector< PacketPtr > & | packets_in; |
std::vector< PacketPtr > & | packets_out; |
std::vector< PacketPtr > & | packets_out_reverse; |
void ** | local_storage; |
PacketPtr & | config_params; |
packets_in
is a reference to a vector of Packets serving
as input to the filter function. packets_out
is a
reference to a vector into which output Packets should be placed. In the
rare case where Packets need to be sent in the reverse direction on the
Stream, packets_out_reverse
should be used instead of
packets_out
.
local_storage
may be used to define and maintain
state specific to a filter instance. Finally, config_params
is a reference to a PacketPtr containing the current configuration settings
for the filter instance, as can be set using Stream::set_FilterParameters
.
For each filter function defined in a shared object file, there must be a
const char * symbol named
by the string formed by the concatenation of the filter function name and
the suffix "_format_string". For instance, if the filter function is named
my_filter_func
, the shared object must define a symbol
const char *my_filter_func_format_string
.
The value of this string will be the MRNet format string describing the
format of data that the filter can operate on. A value of ""
denotes that the filter can operate on data of arbitrary value.
This topic currently pertains to use with the GNU C++ compiler only.
Since we use the C facility dlopen()
to dynamically
load new filter functions, all C++ symbols must be exported. That is,
the symbol definitions must fall with the statements
extern "C" {
and
}
The file that contains the filter functions and format strings may be
compiled with the GNU compiler options "-fPIC -shared -rdynamic"
to produce a valid shared object.
A front-end that will dynamically load filters must be built
with the GNU compiler options "-Wl,-E"
to notify the linker
export global symbols externally.
After the % character that introduces a conversion, there may be a number of flag characters. u, h, l, and a are special modifiers meaning unsigned, short, long and array, respectivley. The full set of conversions are:
c | Matches a signed 8-bit character |
uc | Matches an unsigned 8-bit character |
ac | Matches an array of signed 8-bit characters |
auc | Matches an array of unsigned 8-bit characters |
hd | Matches a signed 16-bit decimal integer |
uhd | Matches an unsigned 16-bit decimal integer |
ahd | Matches an array of signed 16-bit decimal integers |
auhd | Matches an array of unsigned 16-bit decimal integers |
d | Matches a signed 32-bit decimal integer |
ud | Matches an unsigned 32-bit decimal integer |
ad | Matches an array of signed 32-bit decimal integers |
aud | Matches an array of unsigned 32-bit decimal integers |
ld | Matches a signed 64-bit decimal integer |
uld | Matches an unsigned 64-bit decimal integer |
ald | Matches an array of signed 64-bit decimal integers |
auld | Matches an array of unsigned 64-bit decimal integers |
f | Matches a 32-bit floating-point number |
af | Matches an array of 32-bit floating-point numbers |
lf | Matches a 64-bit floating-point number |
alf | Matches an array of 64-bit floating-point numbers |
s | Matches a null-terminated character string. |
as | Matches an array of null-terminated character strings. |
Example B.1. A Complete MRNet Front-End
#include "mrnet/MRNet.h" #include "IntegerAddition.h" using namespace MRN; int main(int argc, char **argv) { int send_val=32, recv_val=0; int tag, retval; PacketPtr p; if( argc != 4 ){ fprintf(stderr, "Usage: %s topology_file backend_exe so_file\n", argv[0]); exit(-1); } const char * topology_file = argv[1]; const char * backend_exe = argv[2]; const char * so_file = argv[3]; const char * dummy_argv=NULL; // This Network() cnstr instantiates the MRNet internal nodes, according to the // organization in "topology_file," and the application back-end with any // specified cmd line args Network * network = new Network( topology_file, backend_exe, &dummy_argv ); // Make sure path to "so_file" is in LD_LIBRARY_PATH int filter_id = network->load_FilterFunc( so_file, "IntegerAdd" ); if( filter_id == -1 ){ fprintf( stderr, "Network::load_FilterFunc() failure\n"); delete network; return -1; } // A Broadcast communicator contains all the back-ends Communicator * comm_BC = network->get_BroadcastCommunicator( ); // Create a stream that will use the Integer_Add filter for aggregation Stream * stream = network->new_Stream( comm_BC, filter_id, SFILTER_WAITFORALL); int num_backends = comm_BC->get_EndPoints().size(); tag = PROT_SUM; unsigned int num_iters=5; // Broadcast a control message to back-ends to send us "num_iters" // waves of integers if( stream->send( tag, "%d %d", send_val, num_iters ) == -1 ){ fprintf( stderr, "stream::send() failure\n"); return -1; } if( stream->flush( ) == -1 ){ fprintf( stderr, "stream::flush() failure\n"); return -1; } // We expect "num_iters" aggregated responses from all back-ends for( unsigned int i=0; i<num_iters; i++ ){ retval = stream->recv(&tag, p); assert( retval != 0 ); //shouldn't be 0, either error or block till data if( retval == -1){ //recv error return -1; } if( p->unpack( "%d", &recv_val ) == -1 ){ fprintf( stderr, "stream::unpack() failure\n"); return -1; } if( recv_val != num_backends * i * send_val ){ fprintf(stderr, "Iteration %d: Success! recv_val(%d) != %d*%d*%d=%d (send_val*i*num_backends)\n", i, recv_val, send_val, i, num_backends, send_val*i*num_backends ); } else{ fprintf(stderr, "Iteration %d: Success! recv_val(%d) == %d*%d*%d=%d (send_val*i*num_backends)\n", i, recv_val, send_val, i, num_backends, send_val*i*num_backends ); } } if(stream->send(PROT_EXIT, "") == -1){ fprintf( stderr, "stream::send(exit) failure\n"); return -1; } if(stream->flush() == -1){ fprintf( stderr, "stream::flush() failure\n"); return -1; } // The Network destructor will cause all internal and leaf tree nodes to exit delete network; return 0; }
Example B.2. A Complete MRNet Back-End
#include "mrnet/MRNet.h" #include "IntegerAddition.h" using namespace MRN; int main(int argc, char **argv) { Stream * stream=NULL; PacketPtr p; int tag=0, recv_val=0, num_iters=0; Network * network = new Network( argc, argv ); do{ if ( network->recv(&tag, p, &stream) != 1){ fprintf(stderr, "stream::recv() failure\n"); return -1; } switch(tag){ case PROT_SUM: p->unpack( "%d %d", &recv_val, &num_iters ); // Send num_iters waves of integers for( unsigned int i=0; i<num_iters; i++ ){ if( stream->send(tag, "%d", recv_val*i) == -1 ){ fprintf(stderr, "stream::send(%%d) failure\n"); return -1; } if( stream->flush( ) == -1 ){ fprintf(stderr, "stream::flush() failure\n"); return -1; } } break; case PROT_EXIT: fprintf( stdout, "Processing PROT_EXIT ...\n"); break; default: fprintf(stdout, "Unknown Protocol: %d\n", tag); break; } } while ( tag != PROT_EXIT ); return 0; }
Example B.3. An MRNet Filter: Integer Addition
extern "C" { //Must Declare the format of data expected by the filter const char * IntegerAdd_format_string = "%d"; void IntegerAdd( std::vector< PacketPtr > & packets_in, std::vector< PacketPtr > & packets_out, std::vector< PacketPtr > & /* packets_out_reverse */, void ** /* client data */, PacketPtr & /* params */ ) { int sum = 0; for( unsigned int i = 0; i < packets_in.size( ); i++ ) { PacketPtr cur_packet = packets_in[i]; int val; cur_packet->unpack("%d", &val); sum += val; } PacketPtr new_packet ( new Packet(packets_in[0]->get_StreamId(), packets_in[0]->get_Tag(), "%d", sum ) ); packets_out.push_back( new_packet ); } } /* extern "C" */
Example B.4. An MRNet Topology File
nutmeg:0 => c01:0 c02:0 c03:0 c04:0 ; c03:0 => c05:0 ; c04:0 => c06:0 c07:0 c08:0 c09:0 ; c08:0 => c10:0 ; c09:0 => c11:0 ; # nutmeg # | # | # ------- # /| |\ # / | | \ # / | | \ # / | | \ # c01 c02 c03 c04 # | | # c05 | # ------- # / | | \ # / | | \ # / | | \ # c06 c07 c08 c09 # | | # c10 c11