Några användbara klasser

I det här avsnittet ska vi titta på några användbara och vanligt förekommande konstruktioner i flertrådade program. Vi ska också skissa hur de kan implementeras i Java.

Litteratur

Lea: Concurrent Programming in Java, avsnitt 3.4 och 3.7

Synkroniseringsvariabler (Condition Variables) (3.4.4)

En synkroniseringsvariabel är en variabel på vilken en tråd kan anropa wait(), notify() osv. I Java är varje Object automatiskt en synkroniseringsvariabel. Att skapa en ny synkroniseringsvariabel innebär alltså bara att skapa ett nytt Object. Att skriva en egen implementation av en synkroniseringsvariabel är knappast meningsfullt. Enda anledningen att göra det är om vi vill att den ska bete sig på något annat sätt än Javas inbyggda synkroniseringsmekanism, vilket sällan torde vara fallet då Javas synkronisering fungerar bra.

Ett tillfälle då det ändå kan vara meningsfullt att skriva en egen synkroniseringsvariabel är om ett större program ska portas från något annat trådbibliotek. Vanligt förekommande trådbibliotek, främst i UNIX-världen, är POSIX threads och Solaris threads (båda skrivna i C). Dessa innehåller inget i stil med Javas synchronized utan erbjuder i stället ett mutex i stil med det som implementeras längre ner på denna sida. Skillnaden mellan synkroniseringsvariabler i Java och i dessa bibliotek är att då det inte finns något synchronized måste det mutex som ska låsas upp vid wait() i stället explicit skickas till synkroniseringsvariabeln. En POSIX/Solaris synkroniseringsvariabel kan definieras på följande sätt i Java:
public interface CondVar {
    public void wait(Mutex m) throws InterruptedException;
    public boolean timedWait(Mutex m, long ms) throws InterruptedException;
    public void signal();  //Analog to notify().
    public void broadcast();  //Analog to notifyAll().
}
Förutom vad som ovan nämnts är enda skillnaden att metoderna har andra namn. Notera att ovanstående interface inte är identiskt med det på sid 233 i boken. I POSIX/Solaris anges mutexet, enligt ovan, först när wait() eller timedWait() anropas. En implementation av bokens CondVar finns i paketet util.concurrent på bokens hemsida, se http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html

Skulle vi nu porta ett program som använder POSIX eller Solaris trådar till Java kan det vara listigt att implementera ovanstående synkroniseringsvariabel och använda den i stället för Javas egen mekanism. På så sätt blir förändringarna i programmet mindre.

Semaforer (3.4.1)

Semaforer är den äldsta synkroniseringsmekanismen, de "uppfanns" redan 1968. En semafor kan liknas vid en kruka med kulor. När semaforen skapas anges hur många kulor det finns i krukan från början. Det går sedan bara att utföra två olika operationer, ta ur en kula eller lägga tillbaks en kula. Dessa operationer kallas P respektive V. (P och V är första bokstäverna i de holländska orden för "ta upp" respektive "lägga tillbaks", semaforer hittades på av en holländare) I Java kan en semafor skrivas så här:
public class Semaphore {
    private int noOfMarbles;

    public Semaphore(int i) {
        noOfMarbles = i;
    }

    public synchronized void  P() throws InterruptedException {
        while (noOfMarbles == 0) {
            wait(); //Can not take from an empty bowl.
        }
        noOfMarbles--; //Take a marble.
    }

    public synchronized void  V() {
        noOfMarbles++; //Replace a marble.
        notifyAll();
    }

}
En semafor som hanterar endast en kula brukar kallas binary semaphore medans en som hanterar flera kulor kallas counting semaphore. Med hjälp av en semafor kan både wait(), notify() och synchronized implementeras. Det är alltså en högst användbar konstruktion för trådhantering.

Ett lås utgörs av en binär semafor. Metoden P() symboliserar att låsa och metoden V() att låsa upp. Notera dock att det går att lägga tillbaks hur många kulor som helst trots att det bara fanns en kula från början. Det är ofta ett oönskat beteende om semaforen används som ett lås. Bland annat därför är det numera ovanligt att lås implementeras med hjälp av semaforer.

En nyttigare tillämpning är att använda en counting semaphore som synkroniseringsvariabel, dvs för att implementera wait() och notify(). Detta är speciellt lämpligt om synkroniseringsvariabeln ska användas för att hålla reda på om det finns någon tillgänglig av en viss resurs. Som exempel tar vi en buffert med ett fixt, ändligt antal platser. Olika trådar läser och skriver i bufferten och det är inte tillåtet att skriva om den är full eller läsa om den är tom. Vi skapar två semaforer, en för att hålla reda på hur många platser som är tillgängliga för läsning och en för hur många platser som är tillgängliga för skrivning. Eftersom bufferten är tom från början initieras den förra till noll kulor och den senare till så många kulor som det finns platser i bufferten. Här kommer buffertimplementationen:
class BoundedBufferWithSemaphores {
    protected final BufferArray buff;
    protected final Semaphore putPermits;
    protected final Semaphore takePermits;

    public BoundedBufferWithSemaphores(int capacity) {
        buff = new BufferArray(capacity);
        putPermits = new Semaphore(capacity);
        takePermits = new Semaphore(0);
    }

    public void put(Object x) throws InterruptedException {
        putPermits.P();
        buff.insert(x);
        takePermits.V();
    }

    public Object take() throws InterruptedException {
        takePermits.P();
        Object x = buff.extract();
        putPermits.V();
        return x;
    }

}

/* A synchronized circular buffer. */
class BufferArray {
    protected final Object[]  array;      // the elements
    protected int putPtr = 0;             // circular indices
    protected int takePtr = 0;

    BufferArray(int n) {
        array = new Object[n];
    }

    synchronized void insert(Object x) {
        array[putPtr] = x;
        putPtr = (putPtr + 1) % array.length;
    }

    synchronized Object extract() {
        Object x = array[takePtr];
        array[takePtr] = null;
        takePtr = (takePtr + 1) % array.length;
        return x;
    }

}

Implementation av mutex (3.7.1)

Som tidigare nämnts kan ett mutex definieras på följande vis:
public interface Mutex {
    public void acquire() throws InterruptedException;
    public void release();
    public boolean attempt(long ms) throws InterruptedException;
}

Implementationen MutexImpl.java är hämtad från paketet util.concurrent på kursbokens hemsida, se http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html. Hanteringen av timeouten i metoden attempt() kan verka lite krånglig men följer riktlinjer som kommer att förklaras i anteckningarna senare i kursen.  

En annan lurig detalj är catch-blocken, vilka ser ut så här:
catch (InterruptedException ex) {
    notify();
    throw ex;
}

Anledningen till att notify() anropas är att en tråd som sover efter ett anrop av wait() kan råka ut för ett anrop av interrupt() samtidigt som den väcks av notify() i release(). Det kan då hända att den får sig ett InterruptedException innan den hinner börja exekvera igen. Den hamnar då i catch-blocket där vi måste väcka nästa tråd annars har signaleringen i release() gått förlorad. Allt detta krångel hade gått att undvika genom att använda notifyAll() i release() i stället, men då hade mutexet blivit långsammare. Sist i catch-blocket kastas undantaget igen för att visa att acquire()/attempt() blev avbruten.

Implementation av läs/skriv-lås (3.3.3.1)

Ett läs/skriv-lås kan, som tidigare visats, ha följande enkla definition:
public interface ReadWriteLock {
    public Mutex getReadLock();
    public Mutex getWriteLock();
}

Gränssnittet Mutex ovan passar utmärkt som definition av både läs- och skrivlåset. Däremot går det inte att använda MutexImpl som impelementation eftersom läs- och skrivlåsen inte är oberoende av varandra. I implementationen av låsen är det lämpligt att införa fyra tillståndsvariabler för att räkna väntande och aktiva läsare och skrivare. Här kommer exempel på implementation, ReadWriteLockImpl.java där metoden attempt() är inte implementerad, men det kan göras på likartat sätt som i MutexImpl. Svaret på frågorna från avsnittet där läs/skriv-låsen definierades är att läsare inte släpps in om det finns väntande skrivare, att skrivare har företräde samt att det inte går att byta ett skrivlås mot ett läslås. Policyn för turordning (de två första frågorna) är lätt att ändra eftersom hanteringen av den isolerats i metoderna allowReader() och allowWriter().