Några användbara klasser
I det här avsnittet ska vi titta på några användbara
och vanligt förekommande konstruktioner i flertrådade program.
Vi ska också skissa hur de kan implementeras i Java.
Litteratur
Lea: Concurrent Programming in Java, avsnitt 3.4 och 3.7
Synkroniseringsvariabler (Condition Variables) (3.4.4)
En synkroniseringsvariabel är en variabel på vilken en tråd
kan anropa wait(), notify() osv. I Java är varje Object
automatiskt en synkroniseringsvariabel. Att skapa en ny synkroniseringsvariabel
innebär alltså bara att skapa ett nytt Object. Att skriva
en egen implementation av en synkroniseringsvariabel är knappast meningsfullt.
Enda anledningen att göra det är om vi vill att den ska bete sig
på något annat sätt än Javas inbyggda synkroniseringsmekanism,
vilket sällan torde vara fallet då Javas synkronisering fungerar
bra.
Ett tillfälle då det ändå kan vara meningsfullt att
skriva en egen synkroniseringsvariabel är om ett större program
ska portas från något annat trådbibliotek. Vanligt förekommande
trådbibliotek, främst i UNIX-världen, är POSIX threads
och Solaris threads (båda skrivna i C). Dessa innehåller inget
i stil med Javas synchronized utan erbjuder i stället ett mutex
i stil med det som implementeras längre ner på denna sida. Skillnaden
mellan synkroniseringsvariabler i Java och i dessa bibliotek är att
då det inte finns något synchronized måste det
mutex som ska låsas upp vid wait() i stället explicit
skickas till synkroniseringsvariabeln. En POSIX/Solaris synkroniseringsvariabel
kan definieras på följande sätt i Java:
public interface CondVar {
public void wait(Mutex m) throws InterruptedException;
public boolean timedWait(Mutex m, long ms) throws InterruptedException;
public void signal(); //Analog to notify().
public void broadcast(); //Analog to notifyAll().
}
Förutom vad som ovan nämnts är enda skillnaden att metoderna
har andra namn. Notera att ovanstående interface inte är identiskt
med det på sid 233 i boken. I POSIX/Solaris anges mutexet, enligt ovan,
först när wait() eller timedWait() anropas. En
implementation av bokens CondVar finns i paketet util.concurrent
på bokens hemsida, se http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html.
Skulle vi nu porta ett program som använder POSIX eller Solaris trådar
till Java kan det vara listigt att implementera ovanstående synkroniseringsvariabel
och använda den i stället för Javas egen mekanism. På
så sätt blir förändringarna i programmet mindre.
Semaforer (3.4.1)
Semaforer är den äldsta synkroniseringsmekanismen, de "uppfanns"
redan 1968. En semafor kan liknas vid en kruka med kulor. När semaforen
skapas anges hur många kulor det finns i krukan från början.
Det går sedan bara att utföra två olika operationer, ta ur
en kula eller lägga tillbaks en kula. Dessa operationer kallas P respektive
V. (P och V är första bokstäverna i de holländska orden
för "ta upp" respektive "lägga tillbaks", semaforer hittades på
av en holländare) I Java kan en semafor skrivas så här:
public class Semaphore {
private int noOfMarbles;
public Semaphore(int i) {
noOfMarbles = i;
}
public synchronized void P() throws InterruptedException
{
while (noOfMarbles == 0) {
wait();
//Can not take from an empty bowl.
}
noOfMarbles--; //Take a marble.
}
public synchronized void V() {
noOfMarbles++; //Replace a
marble.
notifyAll();
}
}
En semafor som hanterar endast en kula brukar kallas binary semaphore
medans en som hanterar flera kulor kallas counting semaphore. Med hjälp
av en semafor kan både wait(), notify() och synchronized
implementeras. Det är alltså en högst användbar konstruktion
för trådhantering.
Ett lås utgörs av en binär semafor. Metoden P()
symboliserar att låsa och metoden V() att låsa upp. Notera
dock att det går att lägga tillbaks hur många kulor som
helst trots att det bara fanns en kula från början. Det är
ofta ett oönskat beteende om semaforen används som ett lås.
Bland annat därför är det numera ovanligt att lås implementeras
med hjälp av semaforer.
En nyttigare tillämpning är att använda en counting semaphore
som synkroniseringsvariabel, dvs för att implementera wait()
och notify(). Detta är speciellt lämpligt om synkroniseringsvariabeln
ska användas för att hålla reda på om det finns någon
tillgänglig av en viss resurs. Som exempel tar vi en buffert med ett
fixt, ändligt antal platser. Olika trådar läser och skriver
i bufferten och det är inte tillåtet att skriva om den är
full eller läsa om den är tom. Vi skapar två semaforer, en
för att hålla reda på hur många platser som är
tillgängliga för läsning och en för hur många platser
som är tillgängliga för skrivning. Eftersom bufferten är
tom från början initieras den förra till noll kulor och den
senare till så många kulor som det finns platser i bufferten.
Här kommer buffertimplementationen:
class BoundedBufferWithSemaphores {
protected final BufferArray buff;
protected final Semaphore putPermits;
protected final Semaphore takePermits;
public BoundedBufferWithSemaphores(int capacity)
{
buff = new BufferArray(capacity);
putPermits = new Semaphore(capacity);
takePermits = new Semaphore(0);
}
public void put(Object x) throws InterruptedException
{
putPermits.P();
buff.insert(x);
takePermits.V();
}
public Object take() throws InterruptedException
{
takePermits.P();
Object x = buff.extract();
putPermits.V();
return x;
}
}
/* A synchronized circular buffer. */
class BufferArray {
protected final Object[] array;
// the elements
protected int putPtr = 0;
// circular indices
protected int takePtr = 0;
BufferArray(int n) {
array = new Object[n];
}
synchronized void insert(Object x) {
array[putPtr] = x;
putPtr = (putPtr + 1) % array.length;
}
synchronized Object extract() {
Object x = array[takePtr];
array[takePtr] = null;
takePtr = (takePtr + 1) %
array.length;
return x;
}
}
Implementation av mutex (3.7.1)
Som tidigare nämnts kan ett mutex
definieras på följande vis:
public interface Mutex {
public void acquire() throws InterruptedException;
public void release();
public boolean attempt(long ms) throws InterruptedException;
}
Implementationen MutexImpl.java
är hämtad från paketet util.concurrent på kursbokens
hemsida, se http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html.
Hanteringen av timeouten i metoden attempt() kan verka lite krånglig
men följer riktlinjer som kommer att förklaras i anteckningarna
senare i kursen.
En annan lurig detalj är catch-blocken, vilka ser ut så
här:
catch (InterruptedException ex) {
notify();
throw ex;
}
Anledningen till att notify() anropas är att en tråd som
sover efter ett anrop av wait() kan råka ut för ett anrop
av interrupt() samtidigt som den väcks av notify() i release().
Det kan då hända att den får sig ett InterruptedException
innan den hinner börja exekvera igen. Den hamnar då i catch-blocket
där vi måste väcka nästa tråd annars har signaleringen
i release() gått förlorad. Allt detta krångel hade
gått att undvika genom att använda notifyAll() i release()
i stället, men då hade mutexet blivit långsammare. Sist i
catch-blocket kastas undantaget igen för att visa att acquire()/attempt()
blev avbruten.
Implementation av läs/skriv-lås (3.3.3.1)
Ett läs/skriv-lås kan, som tidigare
visats, ha följande enkla definition:
public interface ReadWriteLock {
public Mutex getReadLock();
public Mutex getWriteLock();
}
Gränssnittet Mutex ovan passar utmärkt som definition
av både läs- och skrivlåset. Däremot går det inte
att använda MutexImpl som impelementation eftersom läs-
och skrivlåsen inte är oberoende av varandra. I implementationen
av låsen är det lämpligt att införa fyra tillståndsvariabler
för att räkna väntande och aktiva läsare och skrivare.
Här kommer exempel på implementation, ReadWriteLockImpl.java där metoden
attempt() är inte implementerad, men det kan göras på
likartat sätt som i MutexImpl. Svaret på frågorna
från avsnittet där läs/skriv-låsen
definierades är att läsare inte släpps in om det finns
väntande skrivare, att skrivare har företräde samt att det
inte går att byta ett skrivlås mot ett läslås. Policyn
för turordning (de två första frågorna) är lätt
att ändra eftersom hanteringen av den isolerats i metoderna allowReader()
och allowWriter().