/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.controlprogram.parfor.mqo;

import java.util.HashMap;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysml.runtime.controlprogram.parfor.mqo.MergedMRJobInstruction;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.matrix.JobReturn;
import org.apache.sysml.runtime.matrix.data.Pair;

public abstract class PiggybackingWorker
extends Thread {
    protected static final Log LOG = LogFactory.getLog(PiggybackingWorker.class.getName());
    protected HashMap<Long, JobReturn> _results = new HashMap();
    protected boolean _stop = false;

    protected PiggybackingWorker() {
    }

    public void setStopped() {
        this._stop = true;
    }

    public synchronized JobReturn getJobResult(long instID) throws InterruptedException {
        JobReturn ret = null;
        while (ret == null) {
            this.wait();
            ret = this._results.remove(instID);
        }
        return ret;
    }

    protected synchronized void putJobResults(LinkedList<Long> ids, LinkedList<JobReturn> results) {
        for (int i = 0; i < ids.size(); ++i) {
            this._results.put(ids.get(i), results.get(i));
        }
        this.notifyAll();
    }

    protected LinkedList<MergedMRJobInstruction> mergeMRJobInstructions(LinkedList<Pair<Long, MRJobInstruction>> workingSet) throws IllegalAccessException {
        LinkedList<MergedMRJobInstruction> ret = new LinkedList<MergedMRJobInstruction>();
        Timing time = new Timing(true);
        MergedMRJobInstruction minst = new MergedMRJobInstruction();
        for (Pair pair : workingSet) {
            long instID = (Long)pair.getKey();
            MRJobInstruction instVal = (MRJobInstruction)pair.getValue();
            int numOutputs = instVal.getOutputs().length;
            if (minst.inst == null) {
                minst.inst = new MRJobInstruction(instVal);
                minst.addInstructionMetaData(instID, 0, numOutputs);
                continue;
            }
            if (minst.inst.isMergableMRJobInstruction(instVal)) {
                int offOutputs = minst.inst.getOutputs().length;
                minst.inst.mergeMRJobInstruction(instVal);
                minst.addInstructionMetaData(instID, offOutputs, numOutputs);
                continue;
            }
            ret.add(minst);
            minst = new MergedMRJobInstruction();
            minst.inst = new MRJobInstruction(instVal);
            minst.addInstructionMetaData(instID, 0, numOutputs);
        }
        ret.add(minst);
        LOG.info("Merged MR-Job instructions: " + workingSet.size() + " --> " + ret.size() + " in " + time.stop() + "ms.");
        return ret;
    }
}

