Group Communication

A central component is many distributed systems is a group communication. The service provides the means to manage a dynamic group of processes including a set of messaging primitives. One usage is in a setting where one needs to keep a set of replicated servers coordinated.

In this example we implement group communication that provides: leader election, atomic multicast and support for synchronized state transfer.


A process can create a new group and will then be given a group process and a unique group reference. All communication to the group should be done through the group process and all control messages from the group will be tagged with the group reference. The group process is linked to the creator of the group.

A process that wants to join a group can request to join a group if it has access to a group process. If the process is allowed to join the group the process is given:

The new group process is linked to the joining process and it should use it in all further communication with the group. The message from the leader could be used to transfer a state from the group to the new member.

Nothing prevents a process form joining multiple times; a new group process is created for each successful join operation. This will result in duplicate messages being sent to the process.

The implementation is fault tolerant, it will continue to deliver even if processes fail. It is however important to understand the limitations of the implementation.

failure detectors

We will build our service using the Erlang monitor primitive. It will be assumed that a down message is an accurate detection of a failed process and if a down message is delivered it will be the last message from a process. This is of course not true since a process can be disconnected and a down message is then only a suspicion.

If we have a down message that is a result of a network problem ('noconnection') we have a problem. If the leader receives such a message from one of the slaves, it will simply be removed from the group and it is assumed that the slave has died. No checks are made to make sure that further messages do not originate from the slave.

If a slave receives a 'noconnection' message from the leader we will not be able to guarantee the properties that we will describe. In the worst case we could end up with a partitioned group and a leader in each group. To avoid this the slave will exit with the reason 'noconnection'.

There is an alternative solution and that is to treat a 'noconnection' message from the leader as a proper detection of a node crash. The implementation would then have to change since we could have lost several messages and our implementation of reliable multicast would not be accurate.

Assuming that the network is reliable and nodes do not crash this is not a problem but how much can you trust the environment? It is obvious that this is not a solution for a wide area network and certainly not in an Internet setting.

leader election

The implementation is based on a elected leader that is used as a sequencer of messages. A group process is elected the leader and will monitor all processes in the group, regular members, called slaves, will monitor the leader. Each members holds a sequence of peers apart from knowing the leader. The sequence is the same for all members so if the leader dies, leader election is trivial; the next process in the sequence is the leader.

A newly elected leader will start by monitoring all slaves and the slaves will monitor the new leader. If a process is elected that is not alive, this will be detected by all peers and the second in line is elected.

view delivery

When a member of group dies or a new member is added to the group, the leader will send a new view to the members in the group. The view is delivered using the same mechanism that is used for total ordering of multicast messages and the view will thus be delivered by all members in the group in total order. All members of the group will agree on which multicast messages that are delivered before the view change and which are delivered after the change.

If a member that is not the leader dies, this is detected by the leader. The leader then distributes a new view where the dead process is excluded. This procedure is not strictly necessary since the leader election nor the atomic multicast depend on the sequence of peers being updated. The view kept by a member could always contain processes that have died so removing dead peers is not needed for the service to be consistent. The system would work even if the leader did not monitor the members of the group.

A process that wants to join the group will send a message to the leader. The leader will, in a synchronous call, ask the application layer if the new process should be allowed to join the group. The application layer has the possibility to deliver a state to an accepted process so that the joining process. The first view message delivered to the joining process will thus contain not only the members of the group but also the state of the system at the point in time when the process was added.

atomic multicast

Provided we have a leader, implementing atomic multicast is quite simple. A group member will simply send a multicast request to the leader and the leader will multicast messages in the order that requests arrive. If processes did not die there would be nothing more to it but we have to handle the situation where the leader dies in the middle of a multicast operation. If the leader only manages to send the message to some of the members we could end up with a situation where these members deliver the message while the others have not seen the message.

The general solution, but expensive, is to have every receiver of a multicast message forward the message to all other members before delivering it to the application layer. This of course means that each multicast message generates $O(n^2)$ messages, an overhead that we might not always be willing to pay.

As an alternative the service provides one function that does guarantee uniform atomic multicast (uacast) but also a more efficient version that does not give us the uniformity property (abcast)

The algorithm that we implement is as follows: a slave keeps a copy of the last multicast message received from the leader, if the leader dies the new leader will forward the message to all other members of the group. The algorithm relies on the fact that the leader will send the multicast message to the members of the group in the same order as they appear in the sequence of peers. This means that if the leader dies, either the first process in the sequence has received the message or no process has received the message.

The algorithm does provide atomic multicast but not uniform atomic multicast. The problem is that a process will deliver any received multicast message to its application layer as soon as it arrives. This means that if if a leader sends a message to the first slave, it will immediately deliver the message. If the leader now dies it is of course the responsibility of this slave to forward the message to all other members. However, if the slave dies we have a situation where one member did deliver the message but the message has not been received by any other member.

nota bene

The implementation of both abcast and ucast relies on that that if a process sends two messages to two processes and the second message does arrive, then the first message will also arrive. This is not guaranteed by Erlang if the node of the sending process crashes or is disconnected? If this happens the slaves will receive a 'noconnection' message and exit.

basic multicast

If no ordering nor atomicity is required, the system also provides a basic multicast operation (bcast). A member process will then send the message to all other members in the group. This could of course be more efficient but one must be aware of that messages might be delivered not only in different order or violating causal ordering but also to only part of the group is the sender dies. Messages will however be delivered in fifo order.

reliable messaging - not

The service does not provide reliable messaging i.e. it is quite possible that a message that is sent will never be delivered. A slave will forward a message to the leader but if the leader is dead the message is lost and the slave does not know.

causal order messaging

The system provides sending of messages to a specified member in the group. The leader is used as a sequencer also in this case thus providing causal ordering. This of course means that any message will be sent twice but the added over head is actually less. If the group members are on different hosts this means two network hops.

the implementation

The implementation is still in a work in progress so comment are welcome.

Valid XHTML 1.0 Strict

Valid CSS!