GearmanJobServerSession.java

/*
 * Copyright (C) 2009 by Eric Lambert <Eric.Lambert@sun.com>
 * Use and distribution licensed under the BSD license.  See
 * the COPYING file in the parent directory for full text.
 */
package org.gearman.common;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.gearman.client.GearmanIOEventListener;
import org.slf4j.LoggerFactory;

public class GearmanJobServerSession
        implements GearmanSessionEventHandler, GearmanIOEventListener {

    static final String DESCRIPTION_PREFIX = "GearmanJobServerSession";
    private final String DESCRIPTION;
    private final GearmanNIOJobServerConnection connection;
    private Queue<GearmanPacket> packetsToWrite = null;
    private static final org.slf4j.Logger LOG =  LoggerFactory.getLogger(
            Constants.GEARMAN_SESSION_LOGGER_NAME);
    private SelectionKey sessionSelectionKey = null;
    private GearmanSessionEventHandler responseHandler = null;
    private Queue<GearmanTask> newTaskList = null;
    private Queue<GearmanTask> tasksAwaitingAckList = null;

    public GearmanJobServerSession(GearmanJobServerConnection conn)
            throws IllegalArgumentException {
        if (!(conn instanceof GearmanNIOJobServerConnection)) {
            throw new IllegalArgumentException("Session currently only " +
                    "supports instances of " +
                    GearmanNIOJobServerConnection.class.getName());
        }
        connection = (GearmanNIOJobServerConnection) conn;
        DESCRIPTION = DESCRIPTION_PREFIX + ":" +
                Thread.currentThread().getId() + ":" + conn.toString();
    }

    @Override
    public String toString() {
        return DESCRIPTION;
    }

    public void initSession(Selector sel, GearmanSessionEventHandler handler)
            throws IllegalStateException, IOException {
        if (isInitialized()) {
            throw new IllegalStateException("A session can not be " +
                    "initialized twice");
        }
        connection.open();
        packetsToWrite = new LinkedList<GearmanPacket>();
        sessionSelectionKey = connection.registerSelector(sel,
                SelectionKey.OP_READ);
        this.responseHandler = handler;
        newTaskList = new LinkedList<GearmanTask>();
        tasksAwaitingAckList = new LinkedList<GearmanTask>();
        LOG.info("Session " + this + " has been initialized.");      //NOPMD
    }

    public GearmanJobServerConnection getConnection() {
        return connection;
    }

    public SelectionKey getSelectionKey() {
        if (!isInitialized()) {
            throw new IllegalStateException("Session " + this +
                    " has not been initialized");
        }
        return sessionSelectionKey;
    }

    public boolean isInitialized() {
        return (connection != null && connection.isOpen());
    }

    public void waitForTasksToComplete()
            throws IllegalStateException, InterruptedException {
        if (!isInitialized()) {
            throw new IllegalStateException("Session " + this +
                    " has not been initialized");
        }
        try {
            waitForTasksToComplete(-1, TimeUnit.SECONDS);
        } catch (TimeoutException ignore) {
            //THIS SHOULD NEVER HAPPEN
            LOG.warn("Unexpected timeout exception received " +
                    "while waiting for current session task to complete. " +
                    "Timeout value was set to -1, which means do not timeout, " +
                    "yet it did. Go figure.",ignore);
        }
    }

    public void waitForTasksToComplete(long timeout, TimeUnit unit)
            throws InterruptedException, TimeoutException, IllegalStateException {
        if (!isInitialized()) {
            throw new IllegalStateException("Session " + this +
                    " has not been initialized");
        }
        long timeOutInMills = timeout < 0 ? -1 :
            TimeUnit.MILLISECONDS.convert(timeout, unit) +
                System.currentTimeMillis();
        while ((newTaskList.size() > 0 || tasksAwaitingAckList.size() > 0) &&
                !(timeOutInMills < 0 ? false :
                    System.currentTimeMillis() > timeOutInMills)) {
            try {
                driveSessionIO();
            } catch (IOException ioe) {
                LOG.warn("Receieved an IO Exception while" +
                        " driving io for session " + this, ioe);
            }
            Thread.sleep(100);
        }
        if (newTaskList.size() > 0 || tasksAwaitingAckList.size() > 0) {
            throw new TimeoutException("Session " + this + " timed out " +
                    "waiting for all requests to complete");
        }
    }

    public void closeSession() {
        if (!isInitialized()) {
            LOG.warn("Attempted to close a session that is not open: " +
                    toString());
            return;
        }
        LOG.info("Session " + this + " is being closed.");
        sessionSelectionKey.cancel();
        connection.close();
        packetsToWrite.clear();
        packetsToWrite = null;
        tasksAwaitingAckList.clear();
        newTaskList.clear();
        LOG.info("Session " + this + " has successfully closed.");
    }

    public void submitTask(GearmanTask task)
            throws IllegalStateException {
        if (!isInitialized()) {
            throw new IllegalStateException("Session hasnt been initialized." +
                    " Request may not be submitted at this time");
        }
        if (task == null) {
            throw new IllegalStateException("A null request can not be" +
                    " submitted to a server");
        }

        if (!task.getState().equals(GearmanTask.State.NEW)) {
            throw new IllegalStateException("Invalid task state: " +
                    task.getState());
        }

        newTaskList.add(task);
        packetsToWrite.add(task.getRequestPacket());
        sessionSelectionKey.interestOps(sessionSelectionKey.interestOps() |
                SelectionKey.OP_WRITE);
        LOG.info( "Session " + this + " is now handling " +
                "the task " + task);
    }

    public void handleGearmanIOEvent(GearmanPacket event)
            throws IllegalArgumentException {
        if (event == null) {
            throw new IllegalArgumentException("Can not handle a null event");
        }
        if (event.getMagic().equals(GearmanPacketMagic.RES)) {
            throw new IllegalArgumentException("Can not handle a Result event");
        }

        GearmanTask t = new GearmanTask(event);
        submitTask(t);
    }

    public void driveSessionIO()
            throws IOException, GearmanException, IllegalStateException {
        GearmanPacket p = null;
        if (!isInitialized()) {
            throw new IllegalStateException("you can not driveSessionIO on an" +
                    " un-initialized session: " + toString());
        }

        while (sessionHasDataToWrite() && canWrite()) {
            if (packetsToWrite.isEmpty()) {
                connection.write(null);
            } else {
                p = packetsToWrite.remove();
                connection.write(p);
                handleSessionEvent(new GearmanSessionEvent(p, this));           //NOPMD
            }
        }
        if (!sessionHasDataToWrite()) {
            sessionSelectionKey.interestOps(SelectionKey.OP_READ);
        }

        while (canRead()) {
            p = connection.read();
            if (p == null) {
                continue;
            }
            handleSessionEvent(new GearmanSessionEvent(p, this));               //NOPMD
        }
    }

    @Override
    public boolean equals(Object that) {
        if (that == null) {
            return false;
        }

        if (!(that instanceof GearmanJobServerSession)) {
            return false;
        }

        GearmanJobServerSession thatSession = (GearmanJobServerSession) that;

        return this.connection.equals(thatSession.connection);
    }

    @Override
    public int hashCode() {
        return connection.hashCode();
    }

    public void handleSessionEvent(GearmanSessionEvent event)
            throws IllegalArgumentException, IllegalStateException {
        if (event == null) {
            throw new IllegalArgumentException("Event can not be null");
        }
        GearmanPacket p = event.getPacket();
        if (p == null) {
            throw new IllegalArgumentException("Event does not have a packet");
        }
        GearmanPacketMagic m = p.getMagic();
        if (m.equals(GearmanPacketMagic.REQ)) {
            handleReqSessionEvent(event);
        } else if (m.equals(GearmanPacketMagic.RES)) {
            handleResSessionEvent(event);
        } else {
            throw new IllegalStateException("Event has bad magic type " + m);
        }
    }

    public int getNumberOfActiveTasks() throws IllegalStateException {
        if (!isInitialized()) {
            throw new IllegalStateException("Session hasnt been initialized.");
        }
        return tasksAwaitingAckList.size() + newTaskList.size();
    }

    public boolean sessionHasDataToWrite() {
        if (connection == null || !connection.isOpen()) {
            return false;
        }
        return packetsToWrite.isEmpty() ? connection.hasBufferedWriteData() : true;
    }

    private void handleReqSessionEvent(GearmanSessionEvent event)
            throws IllegalStateException {
        GearmanPacket p = event.getPacket();
        GearmanTask t = newTaskList.remove();
        if (t == null) {
            String msg = "Session has received  request event " + event.packet +
                    " but has no task in new task queue.";
            LOG.error(msg);
            throw new IllegalStateException(msg);
        }
        t.handleGearmanIOEvent(p);
        GearmanTask.State state = t.getState();
        LOG.info("Session " + this + " handling a " +
                GearmanPacketMagic.REQ + "/" + p.getPacketType() + " event");
        switch (state) {
            case SUBMITTED:
                tasksAwaitingAckList.add(t);
                LOG.info("Added task " + t.getRequestPacket().getPacketType() +
                        " to taskAwaiting list. List size = " +
                        tasksAwaitingAckList.size() + "( Event was " + p.getPacketType() + ")");
                break;
            case FINISHED:
                break;
            default:
                String code = "000";
                String msg = "Task in invalid state (State = " + state +
                        ") after submission to server";
                LOG.warn(msg);
                responseHandler.handleSessionEvent(new GearmanSessionEvent(
                        new GearmanPacketImpl(
                        GearmanPacketMagic.RES, GearmanPacketType.ERROR,
                        GearmanPacketImpl.generatePacketData(code.getBytes(),
                        msg.getBytes())), this));
        }

    }

    private void handleResSessionEvent(GearmanSessionEvent event) {
        GearmanPacket p = event.getPacket();
        GearmanPacketType t = p.getPacketType();
        GearmanTask task = tasksAwaitingAckList.peek();
        GearmanPacketType taskType = null;
                LOG.info("Session " + this + " handling a " +
                GearmanPacketMagic.RES + "/" + p.getPacketType() + " event");
        switch (t) {
            case JOB_CREATED:
                taskType = task.getRequestPacket().getPacketType();
                if (GearmanPacketType.isJobSubmission(taskType)) {
                    task.handleGearmanIOEvent(p);
                    responseHandler.handleSessionEvent(event);
                } else {
                    handleTypeMismatch("Job Submission", taskType.toString());
                }
                break;
            case NO_JOB:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.GRAB_JOB) ||
                        taskType.equals(GearmanPacketType.GRAB_JOB_UNIQ)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.GRAB_JOB + " or " +
                            GearmanPacketType.GRAB_JOB_UNIQ,
                            taskType.toString());
                }
                break;
            case NOOP:
                if (task != null) {
                    taskType = task.getRequestPacket().getPacketType();
                    if (taskType.equals(GearmanPacketType.PRE_SLEEP)) {
                        task.handleGearmanIOEvent(p);
                    } else {
                        return;
                    }
                }
                break;
            case JOB_ASSIGN:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.GRAB_JOB)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.GRAB_JOB.toString(),
                            taskType.toString());
                }
                break;
            case JOB_ASSIGN_UNIQ:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.GRAB_JOB_UNIQ)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.GRAB_JOB_UNIQ.toString(),
                            taskType.toString());
                }
                break;
            case STATUS_RES:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.GET_STATUS)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.GET_STATUS.toString(),
                            taskType.toString());
                }
                break;
            case ECHO_RES:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.ECHO_REQ)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.ECHO_REQ.toString(),
                            taskType.toString());
                }
                break;
            case OPTION_RES:
                taskType = task.getRequestPacket().getPacketType();
                if (taskType.equals(GearmanPacketType.OPTION_REQ)) {
                    task.handleGearmanIOEvent(p);
                } else {
                    handleTypeMismatch(GearmanPacketType.OPTION_REQ.toString(),
                            taskType.toString());
                }
                break;
            default:
                responseHandler.handleSessionEvent(event);
                return;
        }

        if (task != null ) {
            if (task.getState().compareTo(GearmanTask.State.SUBMITTED) > 0) {
                tasksAwaitingAckList.remove();
                LOG.info("Removed task " + task.getRequestPacket().getPacketType() +
                        " from taskAwaiting list. List size = " +
                        tasksAwaitingAckList.size() + "( Event was " +
                        event.packet.getPacketType() + ")");
            } else {
                LOG.warn("Task " + task + " still in submitted " +
                        "state after receiving acknowlegement from server. " +
                        "Ack = " + p);

            }
        }

    }

    private void handleTypeMismatch(String expected, String got) {
        String code = "000";
        String msg = "Received " + got + " response from server," +
                " but last request was not " + expected;
        LOG.warn(msg);
        responseHandler.handleSessionEvent(
                new GearmanSessionEvent(new GearmanPacketImpl(
                GearmanPacketMagic.RES,
                GearmanPacketType.ERROR,
                GearmanPacketImpl.generatePacketData(code.getBytes(),
                msg.getBytes())), this));
    }

    private boolean canWrite() {
        if (connection == null) {
            return false;
        }
        return connection.canWrite();
    }

    private boolean canRead() {
        if (connection == null) {
            return false;
        }
        return connection.canRead();
    }

}