GearmanNIOJobServerConnection.java

/*
 * Copyright (C) 2012 by Eric Lambert <eric.d.lambert@gmail.com>
 * Use and distribution licensed under the BSD license.  See
 * the COPYING file in the parent directory for full text.
 */
package org.gearman.common;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

import org.slf4j.LoggerFactory;

public class GearmanNIOJobServerConnection
        implements GearmanJobServerIpConnection {

    static final String DESCRIPTION_PREFIX = "GearmanNIOJobServerConnection";
    private final String DESCRIPTION;
    private final String host;
    private final int port;
    private final InetSocketAddress remote;
    private SocketChannel serverConnection = null;
    private Selector selector = null;
    private SelectionKey selectorKey = null;
    private static final org.slf4j.Logger LOG =  LoggerFactory.getLogger(
            Constants.GEARMAN_CLIENT_LOGGER_NAME);
    private ByteBuffer bytesReceived;
    private ByteBuffer bytesToSend;

    public GearmanNIOJobServerConnection(String hostname)
            throws IllegalArgumentException {
        this(hostname, Constants.GEARMAN_DEFAULT_TCP_PORT);
    }

    public GearmanNIOJobServerConnection(String hostname, int port)
            throws IllegalArgumentException {
        this(new InetSocketAddress(hostname, port));
    }

    public GearmanNIOJobServerConnection(InetSocketAddress remote)
            throws IllegalArgumentException {
        if (remote == null) {
            throw new IllegalArgumentException("Remote can not be null");
        }
        this.remote = remote;
        this.host = remote.getHostName();
        this.port = remote.getPort();
        bytesReceived = ByteBuffer.allocate(
                Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
        bytesToSend = ByteBuffer.allocate(
                Constants.GEARMAN_DEFAULT_SOCKET_SEND_SIZE);
        DESCRIPTION = DESCRIPTION_PREFIX + ":" + remote.toString();
    }

    @Override
    public String toString() {
        return DESCRIPTION;
    }

    public void open() throws IOException {
        if (isOpen()) {
            throw new IllegalStateException("A session can not be " +
                    "initialized twice");
        }
        try {
            serverConnection = SocketChannel.open(remote);
            serverConnection.socket().setTcpNoDelay(true);
            serverConnection.socket().setSoLinger(true,
                    Constants.GEARMAN_DEFAULT_SOCKET_TIMEOUT);
            serverConnection.socket().setSoTimeout(
                    Constants.GEARMAN_DEFAULT_SOCKET_TIMEOUT * 1000);
            serverConnection.socket().setReceiveBufferSize(
                    Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
            serverConnection.socket().setSendBufferSize(
                    Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
            serverConnection.configureBlocking(false);
            serverConnection.finishConnect();
            selector = Selector.open();
            selectorKey = serverConnection.register(selector,
                    SelectionKey.OP_WRITE | SelectionKey.OP_READ);
        } catch (IOException ioe) {
//            LOG.warn("Received IOException while attempting to" +
//                    " initialize session " + this +
//                    ". Shuting down session", ioe);
            if (serverConnection != null && serverConnection.isOpen()) {
                if (selector != null && selector.isOpen()) {
                    try {
                        selector.close();
                    } catch (IOException selioe) {
                        LOG.warn("Received IOException while" +
                                " attempting to close selector.", selioe);
                    }
                }
                try {
                    serverConnection.close();
                } catch (IOException closeioe) {
                    LOG.warn("Received IOException while" +
                            " attempting to close connection to server. " +
                            "Giving up!", closeioe);
                }
            }
            throw ioe;
        }
        LOG.info("Connection " + this + " has been opened");
    }

    public void close() {
        if (!isOpen()) {
            throw new IllegalStateException("Can not close a session that " +
                    "has not been initialized");
        }
        LOG.info( "Session " + this + " is being closed.");
        selectorKey.cancel();
        try {
            selector.close();
        } catch (IOException ioe) {
            LOG.warn("Received IOException while attempting to " +
                    "close selector attached to session " + this, ioe);
        } finally {
            try {
                serverConnection.close();
            } catch (IOException cioe) {
                LOG.warn("Received IOException while attempting" +
                        " to close connection for session " + this, cioe);
            }
            serverConnection = null;
        }
        LOG.info( "Connection " + this + " has successfully closed.");
    }

    public void write(GearmanPacket request) throws IOException {

        if (request == null && bytesToSend.position() == 0) {
            return;
        }
        if (request != null) {
            int ps = request.getData().length +
                    Constants.GEARMAN_PACKET_HEADER_SIZE;
            if (bytesToSend.remaining() < ps) {
                int newCapacity = bytesToSend.capacity() * 2;
                while (newCapacity < ps && newCapacity > 0) {
                    newCapacity *=2;
                }
                bytesToSend = growBuffer(bytesToSend, newCapacity);
            }
            byte[] bytes = request.toBytes();
            ByteBuffer bb = ByteBuffer.allocate(bytes.length);
            bb.put(bytes);
            bb.rewind();
            bytesToSend.put(bb);
        }
        selector.selectNow();
        if (selectorKey.isWritable()) {
            selector.selectedKeys().remove(selectorKey);
            //Lets never write more than DEFAULT_SOCKET_SEND_SIZE, so if the
            //buffersize is larger than this, set the limit to default
            int newLimit = bytesToSend.position();
            int oldLimit = newLimit;
            if (newLimit > Constants.GEARMAN_DEFAULT_SOCKET_SEND_SIZE) {
                newLimit = Constants.GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
            }
            bytesToSend.limit(newLimit);
            bytesToSend.rewind();
            int bytesSent = serverConnection.write(bytesToSend);
            bytesToSend.limit(oldLimit);
            bytesToSend.compact();
            LOG.debug("Write command wrote " + bytesSent + " to " +
                    this + ". " + bytesToSend.position() + " bytes left in " +
                    "send buffer");
        } else {
            LOG.debug("Write command can not write request: " +
                    "Selector for " + this + " is not available for write. " +
                    "Will buffer request and send it later.");
        }
    }

    public GearmanPacket read() throws IOException {
        GearmanPacket returnPacket = null;
        selector.selectNow();
        if (selectorKey.isReadable()) {
            selector.selectedKeys().remove(selectorKey);
            if (!bytesReceived.hasRemaining()) {
                bytesReceived = growBuffer(bytesReceived);
            }
            int bytesRead = serverConnection.read(bytesReceived);
            if (bytesRead >= 0) {
                LOG.debug( "Session " + this + " has read " +
                        bytesRead + " bytes from its job server. Buffer has " +
                        bytesReceived.remaining());
            } else {
        	    if (isOpen()) {
        	        close();
                }
                //TODO do something smarter here
                throw new IOException("Connection to job server severed");
            }
        } else {
            LOG.debug("Read command can not read request from" +
                    "session: Selector for " + this + " is not available " +
                    "for read. ");
        }
        if (bufferContainsCompletePacket(bytesReceived)) {
            byte[] pb = new byte[getSizeOfPacket(bytesReceived)];
            bytesReceived.limit(bytesReceived.position());
            bytesReceived.rewind();
            bytesReceived.get(pb);
            bytesReceived.compact();
            returnPacket = new GearmanPacketImpl(new BufferedInputStream(
                    new ByteArrayInputStream(pb)));
        }
        return returnPacket;
    }

    public SelectionKey registerSelector(Selector s, int mask)
            throws IOException {
        return serverConnection.register(s, mask);

    }

    public boolean canRead() {
        if (!selector.isOpen()) {
            return false;
        }
        try {
            selector.selectNow();
        } catch (IOException ioe) {
            LOG.warn("Failed to select on connection " +
                    this, ioe);
        }
        return (selectorKey.isReadable() ||
                bufferContainsCompletePacket(bytesReceived));
    }

    public boolean canWrite() {
        if (!selector.isOpen()) {
            return false;
        }
        try {
            selector.selectNow();
        } catch (IOException ioe) {
            LOG.warn("Connection Failed to select on socket " +
                    this, ioe);
        }
        return (selectorKey.isWritable());
    }

    public boolean hasBufferedWriteData() {
        return bytesToSend.position() > 0;
    }

    public Selector getSelector() {
        return selector;
    }

    public boolean isOpen() {
        return (serverConnection != null && serverConnection.isConnected());
    }

    @Override
    public boolean equals(Object that) {
        if (that == null) {
            return false;
        }

        if (this == that) {
            return true;
        }

        if (!(that instanceof GearmanNIOJobServerConnection)) {
            return false;
        }

        InetSocketAddress thatRemote =
                ((GearmanNIOJobServerConnection) that).remote;

        return this.remote.equals(thatRemote);
    }

    // When you override equals you should override hashcode as well, since
    // two equal objects should have the same hashcode
    @Override
    public int hashCode() {
        return this.remote == null ? 0 : this.remote.hashCode();
    }

    private boolean bufferContainsCompletePacket(ByteBuffer b) {
        if (b.position() < Constants.GEARMAN_PACKET_HEADER_SIZE) {
            return false;
        }
        return b.position() >= getSizeOfPacket(b) ? true : false;
    }

    // DO NOT CALL UNLESS YOU ARE SURE THAT BYTEBUFFER HAS AT LEAST
    // GEARMAN_PACKET_HEADER_SIZE BYTES!
    private int getSizeOfPacket(ByteBuffer buffer) {
        int originalPosition = buffer.position();
        byte[] header = new byte[Constants.GEARMAN_PACKET_HEADER_SIZE];
        buffer.rewind();
        buffer.get(header);
        buffer.position(originalPosition);
        GearmanPacketHeader ph = new GearmanPacketHeader(header);
        return ph.getDataLength() + Constants.GEARMAN_PACKET_HEADER_SIZE;
    }

    private ByteBuffer growBuffer(ByteBuffer originalBuffer)
            throws IllegalArgumentException {
        return growBuffer(originalBuffer, originalBuffer.capacity() * 2);
    }

    private ByteBuffer growBuffer(ByteBuffer orginalBuffer, int newCapacity)
            throws IllegalArgumentException{
        if (newCapacity < orginalBuffer.capacity()) {
            throw new IllegalArgumentException("The new capacity of the " +
                    "buffer (" + newCapacity + ") may not be less than the" +
                    " orginal capacity (" + orginalBuffer.capacity()+")");
        }
        orginalBuffer.flip();
        ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);
        newBuffer.put(orginalBuffer);
        return newBuffer;


    }

	public String getHost() {
        return host;
	}

	public int getPort() {
        return port;
	}
}