001/*
002 * This file is part of McIDAS-V
003 *
004 * Copyright 2007-2015
005 * Space Science and Engineering Center (SSEC)
006 * University of Wisconsin - Madison
007 * 1225 W. Dayton Street, Madison, WI 53706, USA
008 * https://www.ssec.wisc.edu/mcidas
009 * 
010 * All Rights Reserved
011 * 
012 * McIDAS-V is built on Unidata's IDV and SSEC's VisAD libraries, and
013 * some McIDAS-V source code is based on IDV and VisAD source code.  
014 * 
015 * McIDAS-V is free software; you can redistribute it and/or modify
016 * it under the terms of the GNU Lesser Public License as published by
017 * the Free Software Foundation; either version 3 of the License, or
018 * (at your option) any later version.
019 * 
020 * McIDAS-V is distributed in the hope that it will be useful,
021 * but WITHOUT ANY WARRANTY; without even the implied warranty of
022 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
023 * GNU Lesser Public License for more details.
024 * 
025 * You should have received a copy of the GNU Lesser Public License
026 * along with this program.  If not, see http://www.gnu.org/licenses.
027 */
028
029package edu.wisc.ssec.mcidasv.jython;
030
031import static java.util.Objects.requireNonNull;
032
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037
038import org.python.core.PyObject;
039import org.python.core.PyStringMap;
040import org.python.core.PySystemState;
041
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import edu.wisc.ssec.mcidasv.jython.OutputStreamDemux.OutputType;
046
047/**
048 * This class represents a specialized {@link Thread} that creates and executes 
049 * {@link Command}s. A {@link BlockingQueue} is used to maintain thread safety
050 * and to cause a {@code Runner} to wait when the queue is at capacity or has
051 * no {@code Command}s to execute.
052 */
053public class Runner extends Thread {
054
055    private static final Logger logger = LoggerFactory.getLogger(Runner.class);
056
057    /** The maximum number of {@link Command}s that can be queued. */
058    private static final int QUEUE_CAPACITY = 10;
059
060    /** 
061     * Acts like a global output stream that redirects data to whichever 
062     * {@link Console} matches the current thread name.
063     */
064    private final OutputStreamDemux STD_OUT;
065
066    /** 
067     * Acts like a global error stream that redirects data to whichever 
068     * {@link Console} matches the current thread name.
069     */
070    private final OutputStreamDemux STD_ERR;
071
072    /** Queue of {@link Command}s awaiting execution. */
073    private final BlockingQueue<Command> queue;
074
075    /** */
076    private final Console console;
077
078    /** */
079    private final PySystemState systemState;
080
081    /** The Jython interpreter that will actually run the queued commands. */
082    private final Interpreter interpreter;
083
084    /** Not in use yet. */
085    private boolean interrupted = false;
086
087    /**
088     * 
089     * 
090     * @param console
091     */
092    public Runner(final Console console) {
093        this(console, Collections.<String>emptyList());
094    }
095
096    /**
097     * 
098     * 
099     * @param console
100     * @param commands
101     */
102    public Runner(final Console console, final List<String> commands) {
103        requireNonNull(console);
104        requireNonNull(commands);
105        this.console = console;
106        this.STD_ERR = new OutputStreamDemux();
107        this.STD_OUT = new OutputStreamDemux();
108        this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY, true);
109        this.systemState = new PySystemState();
110        this.interpreter = new Interpreter(systemState, STD_OUT, STD_ERR);
111        for (String command : commands) {
112            queueLine(command);
113        }
114    }
115
116    /**
117     * Registers a new callback handler. Currently this only forwards the new
118     * handler to {@link Interpreter#setCallbackHandler(ConsoleCallback)}.
119     * 
120     * @param newCallback The callback handler to register.
121     */
122    protected void setCallbackHandler(final ConsoleCallback newCallback) {
123        queueCommand(new RegisterCallbackCommand(console, newCallback));
124    }
125
126    /**
127     * Fetches, copies, and returns the {@link #interpreter}'s local namespace.
128     * 
129     * @return Copy of the interpreter's local namespace.
130     */
131    protected PyStringMap copyLocals() {
132        return ((PyStringMap)interpreter.getLocals()).copy();
133    }
134
135    /**
136     * Takes commands out of the queue and executes them. We get a lot of 
137     * mileage out of BlockingQueue; it's thread-safe and will block if the 
138     * queue is at capacity or empty.
139     * 
140     * <p>Please note that this method <b>needs</b> to be the first method that
141     * gets called after creating a {@code Runner}.
142     */
143    public void run() {
144        synchronized (this) {
145            STD_OUT.addStream(console, interpreter, OutputType.NORMAL);
146            STD_ERR.addStream(console, interpreter, OutputType.ERROR);
147        }
148        while (true) {
149            try {
150                // woohoo for BlockingQueue!!
151                Command command = queue.take();
152                command.execute(interpreter);
153            } catch (Exception e) {
154                logger.error("failed to execute", e);
155            }
156        }
157    }
158
159    /**
160     * Queues up a series of Jython statements. Currently each command is 
161     * treated as though the current user just entered it; the command appears
162     * in the input along with whatever output the command generates.
163     * 
164     * @param source Batched command source. Anything but null is acceptable.
165     * @param batch The actual commands to execute.
166     */
167    public void queueBatch(final String source,
168        final List<String> batch) 
169    {
170        queueCommand(new BatchCommand(console, source, batch));
171    }
172
173    /**
174     * Queues up a line of Jython for execution.
175     * 
176     * @param line Text of the command.
177     */
178    public void queueLine(final String line) {
179        queueCommand(new LineCommand(console, line));
180    }
181
182    /**
183     * Queues the addition of an object to {@code interpreter}'s local 
184     * namespace.
185     *
186     * @param name Object name as it will appear to {@code interpreter}.
187     * @param object Object to put in {@code interpreter}'s local namespace.
188     */
189    public void queueObject(final String name, final Object object) {
190        queueCommand(new InjectCommand(console, name, object));
191    }
192
193    /**
194     * Queues the removal of an object from {@code interpreter}'s local 
195     * namespace. 
196     * 
197     * @param name Name of the object to be removed, <i>as it appears to
198     * Jython</i>.
199     * 
200     * @see Runner#queueObject(String, Object)
201     */
202    public void queueRemoval(final String name) {
203        queueCommand(new EjectCommand(console, name));
204    }
205
206    /**
207     * Queues up a Jython file to be run by {@code interpreter}.
208     *
209     * @param name {@code __name__} attribute to use for loading {@code path}.
210     * @param path The path to the Jython file.
211     */
212    public void queueFile(final String name,
213        final String path) 
214    {
215        queueCommand(new LoadFileCommand(console, name, path));
216    }
217
218    /**
219     * Queues up a command for execution.
220     * 
221     * @param command Command to place in the execution queue.
222     */
223    private void queueCommand(final Command command) {
224        assert command != null : command;
225        try {
226            queue.put(command);
227        } catch (InterruptedException e) {
228            logger.warn("msg='{}' command='{}'", e.getMessage(), command);
229        }
230    }
231
232    @Override public String toString() {
233        return "[Runner@" + Integer.toHexString(hashCode()) + 
234            ": interpreter=" + interpreter + ", interrupted=" + interrupted +
235            ", QUEUE_CAPACITY=" + QUEUE_CAPACITY + ", queue=" + queue + "]"; 
236    }
237}