package se.kth.isk.leffe.chat.server; import java.util.Iterator; import java.util.Set; import java.util.HashSet; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.ServerSocketChannel; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * A multiplexed, multithreaded chat server. */ public class MultiplexedChatServer implements Runnable { private static final int PORT = 4711; private Selector selector; private ServerSocketChannel serverSocketChannel; private Set clients = new HashSet(); /** * Constructs a chat server. Note that it is intended to be started * in a separate thread. */ public MultiplexedChatServer() throws IOException { selector = Selector.open(); } /** * The run loop of the server. It opens a server socket and starts a new handler * thread for each incoming connection or data on an open connection. */ public void run() { try { openServerSocket(); while (true) { try { waitForCalls(); } catch (IOException ioe) { ioe.printStackTrace(); } } } catch (IOException ioe) { ioe.printStackTrace(); } } private void openServerSocket() throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket ss = serverSocketChannel.socket(); ss.bind(new InetSocketAddress(PORT)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } private void waitForCalls() throws IOException { new Thread(new CallHandler(selector.select())).start(); } private class CallHandler implements Runnable { private int numKeys; private CallHandler(int i) { numKeys = i; } public void run() { getReadyConnections(); } private void getReadyConnections() { boolean newConnection = false; Set readyClients = new HashSet(); if (numKeys > 0) { Set selectedKeys = selector.selectedKeys(); synchronized (selectedKeys) { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = (SelectionKey)i.next(); newConnection = isReadyForOperation(key, SelectionKey.OP_ACCEPT); if (isReadyForOperation(key, SelectionKey.OP_READ)) { readyClients.add(key); } i.remove(); } } if (newConnection) { handleNewConnection(); } if (!readyClients.isEmpty()) { echoData(readyClients); } } } private boolean isReadyForOperation(SelectionKey key, int operation) { int readyOps = key.readyOps(); System.out.println("OP_ACCEPT:" + Boolean.toString((readyOps & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT)); System.out.println("OP_CONNECT:" + Boolean.toString((readyOps & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT)); System.out.println("OP_READ:" + Boolean.toString((readyOps & SelectionKey.OP_READ) == SelectionKey.OP_READ)); System.out.println("OP_WRITE:" + Boolean.toString((readyOps & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)); return ((readyOps & operation) == operation); } private void handleNewConnection() { try { SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); clients.add(channel); } catch (IOException ioe) { ioe.printStackTrace(); } } private void echoData(Set readyClients) { ByteBuffer buffer = ByteBuffer.allocate(1024); for (Iterator rc = readyClients.iterator(); rc.hasNext();) { SocketChannel inChannel = (SocketChannel)((SelectionKey)rc.next()).channel(); try { inChannel.read(buffer); } catch (IOException ioe) { removeConnection(inChannel); ioe.printStackTrace(); } if (buffer.limit() == 0) { /* This doesn't work. buffer.limit() is always 1024 and all variants of * inChannel.isConnected() and inChannel.socket().isConnected() are always true. * How to find out if the client has been shut down? */ removeConnection(inChannel); } else { buffer.flip(); for (Iterator c = clients.iterator(); c.hasNext();) { SocketChannel outChannel = (SocketChannel)c.next(); try { outChannel.write(buffer); } catch (IOException ioe) { removeConnection(outChannel); ioe.printStackTrace(); } buffer.rewind(); } buffer.clear(); } } } private void removeConnection(SocketChannel channel) { try { System.out.println("pos1"); clients.remove(channel); channel.keyFor(selector).cancel(); channel.socket().close(); System.out.println("pos2"); } catch (IOException ioe) { ioe.printStackTrace(); } } } public static void main(String[] args) { try { new Thread(new MultiplexedChatServer()).start(); } catch (IOException ioe) { ioe.printStackTrace(); } } }