GearmanWorkerImpl.java

/*
 * Copyright (C) 2013 by Eric Lambert <eric.d.lambert@gmail.com>
 * Use and distribution licensed under the BSD license.  See
 * the COPYING file in the parent directory for full text.
 */
package org.gearman.worker;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import org.gearman.common.Constants;
import org.gearman.common.GearmanException;
import org.gearman.common.GearmanJobServerConnection;
import org.gearman.common.GearmanJobServerIpConnectionFactory;
import org.gearman.common.GearmanJobServerSession;
import org.gearman.common.GearmanNIOJobServerConnectionFactory;
import org.gearman.common.GearmanPacket;
import org.gearman.common.GearmanPacket.DataComponentName;
import org.gearman.common.GearmanPacketImpl;
import org.gearman.common.GearmanPacketMagic;
import org.gearman.common.GearmanPacketType;
import org.gearman.common.GearmanServerResponseHandler;
import org.gearman.common.GearmanSessionEvent;
import org.gearman.common.GearmanSessionEventHandler;
import org.gearman.common.GearmanTask;

import org.gearman.util.ByteUtils;
import org.slf4j.LoggerFactory;

public class GearmanWorkerImpl
        implements GearmanWorker, GearmanSessionEventHandler {

    static public enum State {

        IDLE, RUNNING, SHUTTINGDOWN
    }
    private static final String DESCRIPION_PREFIX = "GearmanWorker";
    private Queue<GearmanFunction> functionList = null;
    private Selector ioAvailable = null;
    private static final org.slf4j.Logger LOG =  LoggerFactory.getLogger(
            Constants.GEARMAN_WORKER_LOGGER_NAME);
    private String id;
    private Map<String, FunctionDefinition> functionMap;
    private State state;
    private ExecutorService executorService;
    private Map<GearmanJobServerSession, GearmanTask> taskMap = null;
    private Map<SelectionKey,GearmanJobServerSession> sessionMap = null;
    private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
    private volatile boolean jobUniqueIdRequired = false;

    class GrabJobEventHandler implements GearmanServerResponseHandler {

        private final GearmanJobServerSession session;
        private boolean isDone = false;

        GrabJobEventHandler(GearmanJobServerSession session) {
            super();
            this.session = session;
        }

        public void handleEvent(GearmanPacket event) throws GearmanException {
            handleSessionEvent(new GearmanSessionEvent(event, session));
            isDone = true;
        }

        public boolean isDone() {
            return isDone;
        }
    }

    static class FunctionDefinition {

        private final long timeout;
        private final GearmanFunctionFactory factory;

        FunctionDefinition(long timeout, GearmanFunctionFactory factory) {
            this.timeout = timeout;
            this.factory = factory;
        }

        long getTimeout() {
            return timeout;
        }

        GearmanFunctionFactory getFactory() {
            return factory;
        }
    }

    public GearmanWorkerImpl() {
        this (null);
    }

    public GearmanWorkerImpl(ExecutorService executorService) {
        functionList = new LinkedList<GearmanFunction>();
        id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
        functionMap = new HashMap<String, FunctionDefinition>();
        state = State.IDLE;
        this.executorService = executorService;
        taskMap = new HashMap<GearmanJobServerSession, GearmanTask>();
        sessionMap = new ConcurrentHashMap<SelectionKey, GearmanJobServerSession>();
    }

    @Override
    public String toString() {
        return id;
    }

    public void work() {
        if (!state.equals(State.IDLE)) {
            throw new IllegalStateException("Can not call work while worker " +
                    "is running or shutting down");
        }

        state = State.RUNNING;
        // a map keeping track of sessions with connection errors
        // (to avoid printing an error about them in every reconnect attempt)
        Map<GearmanJobServerSession, Boolean> havingConnectionError = new HashMap<GearmanJobServerSession, Boolean>();

        while (isRunning()) {

            // look for sessions which have been disconnected and attempt to reconnect.
            for (Iterator<GearmanJobServerSession> iter = sessionMap.values().iterator(); iter.hasNext();) {
                GearmanJobServerSession sess = iter.next();
                if (!sess.isInitialized()) {
                    try {

                        // reconnect, unregister old selection key and register new one
                        SelectionKey oldKey = sess.isInitialized() ? sess.getSelectionKey() : null;
                        sess.initSession(ioAvailable, this);
                        if (oldKey != null) {
                            iter.remove();
                        }
                        sessionMap.put(sess.getSelectionKey(), sess);

                        // register all functions with the newly reconnected server
                        for (FunctionDefinition d : functionMap.values()) {
                            GearmanTask gsr = new GearmanTask(null, generateCanDoPacket(d));
                            sess.submitTask(gsr);
                        }
                        GearmanTask sessTask = new GearmanTask(
                                new GrabJobEventHandler(sess),
                                new GearmanPacketImpl(GearmanPacketMagic.REQ,
                                getGrabJobPacketType(), new byte[0]));
                        sess.submitTask(sessTask);
                        sess.driveSessionIO();

                        // log reconnection message
                        if (havingConnectionError.get(sess)) {
                            LOG.info("Re-established connection to " + sess.getConnection().toString());
                        }
                        havingConnectionError.put(sess, false);
                    } catch (IOException e) {
                        if (!havingConnectionError.get(sess)) {
                            LOG.warn("Error connecting to " + sess + ", will keep trying..");
                        }

                        havingConnectionError.put(sess, true);

                        try {
                            Thread.sleep(50);
                        } catch (InterruptedException e1) {
                        }
                    }
                } else {
                    havingConnectionError.put(sess, false);
                }
            }

            for (GearmanJobServerSession sess : sessionMap.values()) {
            	// if still disconnected, skip
            	if (!sess.isInitialized()) {
            		continue;
            	}
                int interestOps = SelectionKey.OP_READ;
                if (sess.sessionHasDataToWrite()) {
                    interestOps |= SelectionKey.OP_WRITE;
                }
                sess.getSelectionKey().interestOps(interestOps);
            }
            try {
                ioAvailable.select(1);
            } catch (IOException io) {
                LOG.warn("Receieved IOException while" +
                        " selecting for IO",io);
            }

            for (SelectionKey key : ioAvailable.selectedKeys()) {
                 GearmanJobServerSession sess = sessionMap.get(key);
                 if (sess == null) {
                     LOG.warn("Worker does not have " +
                             "session for key " + key);
                     continue;
                 }
                if (!sess.isInitialized()) {
                    continue;
                }
                try {
                    GearmanTask sessTask = taskMap.get(sess);
                    if (sessTask == null) {
                        sessTask = new GearmanTask(                             //NOPMD
                                new GrabJobEventHandler(sess),
                                new GearmanPacketImpl(GearmanPacketMagic.REQ,
                                getGrabJobPacketType(), new byte[0]));
                        taskMap.put(sess, sessTask);
                        sess.submitTask(sessTask);
                        LOG.debug("Worker: " + this + " submitted a " +
                                sessTask.getRequestPacket().getPacketType() +
                                " to session: " + sess);
                    }
                    sess.driveSessionIO();
                    //For the time being we will execute the jobs synchronously
                    //in the future, I expect to change this.
                    if (!functionList.isEmpty()) {
                        GearmanFunction fun = functionList.remove();
                        submitFunction(fun);
                    }
                } catch (IOException ioe) {
                    LOG.warn("Received IOException while driving" +
                            " IO on session " + sess, ioe);
                	sess.closeSession();
                    continue;
                }
            }
        }

        shutDownWorker(true);
    }

    public void handleSessionEvent(GearmanSessionEvent event)
            throws IllegalArgumentException, IllegalStateException {
        GearmanPacket p = event.getPacket();
        GearmanJobServerSession s = event.getSession();
        GearmanPacketType t = p.getPacketType();
        LOG.debug("Worker " + this + " handling session event" +
                " ( Session = " + s + " Event = " + t + " )");
        switch (t) {
            case JOB_ASSIGN:
            	//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
                taskMap.remove(s);
                addNewJob(event);
                break;
            case JOB_ASSIGN_UNIQ:
            	//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
                taskMap.remove(s);
                addNewJob(event);
                break;
            case NOOP:
                taskMap.remove(s);
                break;
            case NO_JOB:
                GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
                        new GearmanPacketImpl(GearmanPacketMagic.REQ,
                        GearmanPacketType.PRE_SLEEP, new byte[0]));
                taskMap.put(s, preSleepTask);
                s.submitTask(preSleepTask);
                break;
            case ECHO_RES:
                break;
            case OPTION_RES:
                break;
            case ERROR:
                s.closeSession();
                break;
            default:
                LOG.warn("Received unknown packet type " + t +
                        " from session " + s + ". Closing connection.");
                s.closeSession();
        }
    }

    public boolean addServer(String host, int port) {
    	return addServer(connFactory.createConnection(host, port));
    }

    public boolean addServer(GearmanJobServerConnection conn)
            throws IllegalArgumentException, IllegalStateException {

        if (conn == null) {
            throw new IllegalArgumentException("Connection can not be null");
        }
        //this is a sub-optimal way to look for dups, but addJobServer
        //ops should be infrequent enough that this should be a big penalty
        for (GearmanJobServerSession sess : sessionMap.values()) {
            if (sess.getConnection().equals(conn)) {
                return true;
            }
        }

        GearmanJobServerSession session =
                new GearmanJobServerSession(conn);
        if (ioAvailable == null) {
            try {
                ioAvailable = Selector.open();
            } catch (IOException ioe) {
                LOG.warn("Failed to connect to job server "
                    + conn + ".",ioe);
                return false;
            }
        }
        try {
            session.initSession(ioAvailable, this);
        } catch (IOException ioe) {
            LOG.warn("Failed to initialize session with job server "
                    + conn + ".",ioe);
            return false;
        }
        SelectionKey key = session.getSelectionKey();
        if (key == null) {
            String msg = "Session " + session + " has a null " +
                    "selection key. Server will not be added to worker.";
            LOG.warn(msg);
            throw new IllegalStateException(msg);
        }
        sessionMap.put(key, session);

        GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.SET_CLIENT_ID, ByteUtils.toUTF8Bytes(id));
        session.submitTask(new GearmanTask(p));

        for (FunctionDefinition def : functionMap.values()) {
            p = generateCanDoPacket(def);
            session.submitTask(new GearmanTask(p));                              //NOPMD
        }

        p = new GearmanPacketImpl(GearmanPacketMagic.REQ,
                getGrabJobPacketType(), new byte[0]);
        GearmanTask gsr = new GearmanTask(
                new GrabJobEventHandler(session), p);
        taskMap.put(session, gsr);
        session.submitTask(gsr);

        LOG.debug("Added server " + conn + " to worker " + this);
        return true;
    }

    public boolean hasServer(GearmanJobServerConnection conn) {
        boolean foundIt = false;
        for (GearmanJobServerSession sess : sessionMap.values()) {
            if (sess.getConnection().equals(conn)) {
                foundIt = true;
            }
        }
        return foundIt;
    }

    public String echo(String text, GearmanJobServerConnection conn) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public void registerFunction(String function, long timeout) {
        registerFunctionFactory(new DefaultGearmanFunctionFactory(function),
                timeout);
    }

    public void registerFunction(String function) {
        registerFunction(function, 0);
    }

    public void registerFunction(Class<? extends GearmanFunction> function) {
        registerFunction(function, 0);
    }

    public void registerFunction(Class<? extends GearmanFunction> function,
            long timeout) {
        registerFunctionFactory(new DefaultGearmanFunctionFactory(
                function.getName()), timeout);
    }

    public void registerFunctionFactory(GearmanFunctionFactory factory) {
        registerFunctionFactory(factory, 0);
    }

    public void registerFunctionFactory(GearmanFunctionFactory factory,
            long timeout) {
        if (functionMap.containsKey(factory.getFunctionName())) {
            return;
        }
        FunctionDefinition def = new FunctionDefinition(timeout, factory);
        functionMap.put(factory.getFunctionName(), def);
        sendToAll(generateCanDoPacket(def));
        LOG.debug("Worker " + this + " has registered function " +
                factory.getFunctionName());
    }

    public Set<String> getRegisteredFunctions() {
        HashSet<String> functions = new HashSet<String>();
        for (FunctionDefinition def : functionMap.values()) {
            functions.add(def.factory.getFunctionName());
        }
        return functions;
    }

    public void setWorkerID(String id) throws IllegalArgumentException {
        if (id == null) {
            throw new IllegalArgumentException("Worker ID may not be null");
        }
        this.id = id;
        sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.SET_CLIENT_ID, ByteUtils.toUTF8Bytes(id)));
    }

    public void setWorkerID(String id, GearmanJobServerConnection conn) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public String getWorkerID() {
        return id;
    }

    public String getWorkerID(GearmanJobServerConnection conn) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public void unregisterFunction(String functionName) {
        functionMap.remove(functionName);
        sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.CANT_DO, ByteUtils.toUTF8Bytes(functionName)));
        LOG.debug("Worker " + this + " has unregistered function " +
                functionName);
    }

    public void unregisterAll() {
        functionMap.clear();
        sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.RESET_ABILITIES, new byte[0]));
    }

    public void stop() {
        state = State.SHUTTINGDOWN;
    }

    public List<Exception> shutdown() {
        return shutDownWorker(false);
    }

    public boolean isRunning() {
        return state.equals(State.RUNNING);
    }

	public void setJobUniqueIdRequired(boolean requiresJobUUID) {
		jobUniqueIdRequired = requiresJobUUID;
	}

	public boolean isJobUniqueIdRequired() {
		return jobUniqueIdRequired;
	}

    private GearmanPacket generateCanDoPacket(FunctionDefinition def) {
        GearmanPacketType pt = GearmanPacketType.CAN_DO;
        byte[] data = null;
        byte[] name = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
        long timeout = def.getTimeout();

        if (timeout > 0) {
            pt = GearmanPacketType.CAN_DO_TIMEOUT;
            byte[] to = ByteUtils.toUTF8Bytes(String.valueOf(timeout));
            data = new byte[name.length + to.length + 1];
            System.arraycopy(name, 0, data, 0, name.length);
            data[name.length] = ByteUtils.NULL;
            System.arraycopy(to, 0, data, name.length + 1, to.length);
        } else {
            data = name;
        }
        return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
    }

    private void sendToAll(GearmanPacket p) {
        sendToAll(null, p);
    }

    private void sendToAll(GearmanServerResponseHandler handler, GearmanPacket p) {
        GearmanTask gsr = new GearmanTask(handler, p);
        for (GearmanJobServerSession sess : sessionMap.values()) {
            sess.submitTask(gsr);
        }
    }

    /*
     * For the time being this will always return an empty list of
     * exceptions because closeSession does not throw an exception
     */
    private List<Exception> shutDownWorker(boolean completeTasks) {
        LOG.info("Commencing shutdowm of worker " + this);

        ArrayList<Exception> exceptions = new ArrayList<Exception>();

        // This gives any jobs in flight a chance to complete
        if (executorService != null) {
            if (completeTasks) {
                executorService.shutdown();
            } else {
                executorService.shutdownNow();
            }
        }

        for (GearmanJobServerSession sess : sessionMap.values()) {
            sess.closeSession();
        }
        try {
            ioAvailable.close();
        } catch (IOException ioe) {
            LOG.warn("Encountered IOException while closing selector for worker: ", ioe);
        }
        state = State.IDLE;
        LOG.info("Completed shutdowm of worker " + this);

        return exceptions;
    }

    private void addNewJob(GearmanSessionEvent event) {
        byte[] handle, data, functionNameBytes, unique;
        GearmanPacket p = event.getPacket();
        GearmanJobServerSession sess = event.getSession();
        String functionName;
        handle = p.getDataComponentValue(
                GearmanPacket.DataComponentName.JOB_HANDLE);
        functionNameBytes = p.getDataComponentValue(
                GearmanPacket.DataComponentName.FUNCTION_NAME);
        data = p.getDataComponentValue(
                GearmanPacket.DataComponentName.DATA);
        unique = p.getDataComponentValue(DataComponentName.UNIQUE_ID);
        functionName = ByteUtils.fromUTF8Bytes(functionNameBytes);
        FunctionDefinition def = functionMap.get(functionName);
        if (def == null) {
            GearmanTask gsr = new GearmanTask(
                    new GearmanPacketImpl(GearmanPacketMagic.REQ,
                    GearmanPacketType.WORK_FAIL, handle));
            sess.submitTask(gsr);
        } else {
            GearmanFunction function = def.getFactory().getFunction();
            function.setData(data);
            function.setJobHandle(handle);
            function.registerEventListener(sess);
            if (unique != null && unique.length > 0) {
            	function.setUniqueId(unique);
            }
            functionList.add(function);
        }
    }

    private void submitFunction(GearmanFunction fun) {
        try {
            if (executorService == null) {
                fun.call();
            } else {
                executorService.submit(fun);
            }
        } catch (Exception e) {
            LOG.warn("Exception while executing function " + fun.getName(), e);
        }
    }

    private GearmanPacketType getGrabJobPacketType() {
        if (jobUniqueIdRequired) {
            return GearmanPacketType.GRAB_JOB_UNIQ;
        }
        return GearmanPacketType.GRAB_JOB;
    }
}