AbstractGearmanFunction.java

/*
 * Copyright (C) 2013 by Eric Lambert <eric.d.lambert@gmail.com>
 * Use and distribution licensed under the BSD license.  See
 * the COPYING file in the parent directory for full text.
 */
package org.gearman.worker;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import org.gearman.client.GearmanIOEventListener;
import org.gearman.client.GearmanJobResult;
import org.gearman.common.GearmanPacket;
import org.gearman.common.GearmanPacketImpl;
import org.gearman.common.GearmanPacketMagic;
import org.gearman.common.GearmanPacketType;
import org.gearman.util.ByteUtils;

public abstract class AbstractGearmanFunction implements GearmanFunction {

    private static final String NULL_JOB_RESULT = "executeFunction call returned null GearmanJobResult";
    private static final byte[] NULL_JOB_RESULT_BYTES = ByteUtils
	    .toAsciiBytes(NULL_JOB_RESULT);

    protected final String name;
    protected Object data;
    protected byte[] jobHandle;
    protected Set<GearmanIOEventListener> listeners;
    protected byte [] uniqueId;

    public AbstractGearmanFunction() {
        this(null);
    }

    public AbstractGearmanFunction(String name) {
        listeners = new HashSet<GearmanIOEventListener>();
        jobHandle = new byte[0];
        if (name == null) {
            this.name = this.getClass().getCanonicalName();
        } else {
            this.name = name;
        }
    }

    public String getName() {
        return name;
    }

    public void setData(Object data) {
        this.data = data;
    }

    public void setJobHandle(byte[] handle) throws IllegalArgumentException {
        if (handle == null) {
            throw new IllegalArgumentException("handle can not be null");
        }
        if (handle.length == 0) {
            throw new IllegalArgumentException("handle can not be empty");
        }
        jobHandle = new byte[handle.length];
        System.arraycopy(handle, 0, jobHandle, 0, handle.length);
    }

    public byte[] getJobHandle() {
        byte[] rt = new byte[jobHandle.length];
        System.arraycopy(jobHandle, 0, rt, 0, jobHandle.length);
        return rt;
    }

    public void registerEventListener(GearmanIOEventListener listener)
            throws IllegalArgumentException {
        if (listener == null) {
            throw new IllegalArgumentException("listener can not be null");
        }
        listeners.add(listener);
    }

    public void fireEvent(GearmanPacket event)
            throws IllegalArgumentException {
        if (event == null) {
            throw new IllegalArgumentException("event can not be null");
        }
        for (GearmanIOEventListener listener : listeners) {
            listener.handleGearmanIOEvent(event);
        }
    }

    public void sendData(byte[] data) {
        fireEvent(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.WORK_DATA,
                GearmanPacketImpl.generatePacketData(jobHandle, data)));

    }

    public void sendWarning(byte[] warning) {
        fireEvent(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.WORK_WARNING,
                GearmanPacketImpl.generatePacketData(jobHandle, warning)));
    }

    public void sendException(byte[] exception) {
        fireEvent(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.WORK_EXCEPTION,
                GearmanPacketImpl.generatePacketData(jobHandle, exception)));
    }

    public void sendStatus(int denominator, int numerator) {
        fireEvent(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                GearmanPacketType.WORK_STATUS,
                GearmanPacketImpl.generatePacketData(jobHandle,
                ByteUtils.toUTF8Bytes(String.valueOf(numerator)),
                ByteUtils.toUTF8Bytes(String.valueOf(denominator)))));
    }

    public abstract GearmanJobResult executeFunction();

    public GearmanJobResult call() {
        GearmanPacket event = null;
        GearmanJobResult result = null;
        Exception thrown = null;
        try {
            result = executeFunction();
        } catch (Exception e) {
            thrown = e;
        }
        if (result == null) {
            byte[] exceptionBytes = null;
            RuntimeException toThrow = null;
            if (thrown != null) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                thrown.printStackTrace(new PrintStream(baos));
                exceptionBytes = baos.toByteArray();
                toThrow = new RuntimeException(thrown);
            } else {
                exceptionBytes = NULL_JOB_RESULT_BYTES;
                toThrow = new IllegalStateException(NULL_JOB_RESULT);
            }
            fireEvent(new GearmanPacketImpl(GearmanPacketMagic.REQ,
                    GearmanPacketType.WORK_EXCEPTION,
                    GearmanPacketImpl.generatePacketData(jobHandle,
                            exceptionBytes)));
            throw toThrow;
        }
        byte[] warnings = result.getWarnings();
        if (warnings.length > 0) {
            sendWarning(warnings);
        }

        if (result.jobSucceeded()) {
            event = new GearmanPacketImpl(GearmanPacketMagic.REQ,
                    GearmanPacketType.WORK_COMPLETE,
                    GearmanPacketImpl.generatePacketData(jobHandle,
                            result.getResults()));
        } else {
            event = new GearmanPacketImpl(GearmanPacketMagic.REQ,
                    GearmanPacketType.WORK_FAIL, jobHandle);

        }
        fireEvent(event);
        return result;
    }

    public void setUniqueId(byte[] uuid) {
        if (uuid == null) {
            throw new IllegalArgumentException("UUID can not be null");
        }
        this.uniqueId = Arrays.copyOf(uuid, uuid.length);
    }

    public byte[] getUniqueId() {
        if (this.uniqueId != null) {
            return Arrays.copyOf(this.uniqueId, this.uniqueId.length);
        }
        return null;
    }
}