001 /*
002 * $Id: Runner.java,v 1.20 2012/02/19 17:35:47 davep Exp $
003 *
004 * This file is part of McIDAS-V
005 *
006 * Copyright 2007-2012
007 * Space Science and Engineering Center (SSEC)
008 * University of Wisconsin - Madison
009 * 1225 W. Dayton Street, Madison, WI 53706, USA
010 * https://www.ssec.wisc.edu/mcidas
011 *
012 * All Rights Reserved
013 *
014 * McIDAS-V is built on Unidata's IDV and SSEC's VisAD libraries, and
015 * some McIDAS-V source code is based on IDV and VisAD source code.
016 *
017 * McIDAS-V is free software; you can redistribute it and/or modify
018 * it under the terms of the GNU Lesser Public License as published by
019 * the Free Software Foundation; either version 3 of the License, or
020 * (at your option) any later version.
021 *
022 * McIDAS-V is distributed in the hope that it will be useful,
023 * but WITHOUT ANY WARRANTY; without even the implied warranty of
024 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
025 * GNU Lesser Public License for more details.
026 *
027 * You should have received a copy of the GNU Lesser Public License
028 * along with this program. If not, see http://www.gnu.org/licenses.
029 */
030
031 package edu.wisc.ssec.mcidasv.jython;
032
033 import static edu.wisc.ssec.mcidasv.util.Contract.notNull;
034
035 import java.util.Collections;
036 import java.util.List;
037 import java.util.concurrent.ArrayBlockingQueue;
038 import java.util.concurrent.BlockingQueue;
039
040 import org.python.core.PyObject;
041 import org.python.core.PyStringMap;
042 import org.python.core.PySystemState;
043
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 import edu.wisc.ssec.mcidasv.jython.OutputStreamDemux.OutputType;
048
049 /**
050 * This class represents a specialized {@link Thread} that creates and executes
051 * {@link Command}s. A {@link BlockingQueue} is used to maintain thread safety
052 * and to cause a {@code Runner} to wait when the queue is at capacity or has
053 * no {@code Command}s to execute.
054 */
055 public class Runner extends Thread {
056
057 private static final Logger logger = LoggerFactory.getLogger(Runner.class);
058
059 /** The maximum number of {@link Command}s that can be queued. */
060 private static final int QUEUE_CAPACITY = 10;
061
062 /**
063 * Acts like a global output stream that redirects data to whichever
064 * {@link Console} matches the current thread name.
065 */
066 private final OutputStreamDemux STD_OUT;
067
068 /**
069 * Acts like a global error stream that redirects data to whichever
070 * {@link Console} matches the current thread name.
071 */
072 private final OutputStreamDemux STD_ERR;
073
074 /** Queue of {@link Command}s awaiting execution. */
075 private final BlockingQueue<Command> queue;
076
077 /** */
078 private final Console console;
079
080 /** */
081 private final PySystemState systemState;
082
083 /** The Jython interpreter that will actually run the queued commands. */
084 private final Interpreter interpreter;
085
086 /** Not in use yet. */
087 private boolean interrupted = false;
088
089 /**
090 *
091 *
092 * @param console
093 */
094 public Runner(final Console console) {
095 this(console, Collections.<String>emptyList());
096 }
097
098 /**
099 *
100 *
101 * @param console
102 * @param commands
103 */
104 public Runner(final Console console, final List<String> commands) {
105 notNull(console, commands);
106 this.console = console;
107 this.STD_ERR = new OutputStreamDemux();
108 this.STD_OUT = new OutputStreamDemux();
109 this.queue = new ArrayBlockingQueue<Command>(QUEUE_CAPACITY, true);
110 this.systemState = new PySystemState();
111 this.interpreter = new Interpreter(systemState, STD_OUT, STD_ERR);
112 for (String command : commands) {
113 queueLine(command);
114 }
115 }
116
117 /**
118 * Registers a new callback handler. Currently this only forwards the new
119 * handler to {@link Interpreter#setCallbackHandler(ConsoleCallback)}.
120 *
121 * @param newCallback The callback handler to register.
122 */
123 protected void setCallbackHandler(final ConsoleCallback newCallback) {
124 queueCommand(new RegisterCallbackCommand(console, newCallback));
125 }
126
127 /**
128 * Fetches, copies, and returns the {@link #interpreter}'s local namespace.
129 *
130 * @return Copy of the interpreter's local namespace.
131 */
132 protected PyStringMap copyLocals() {
133 return ((PyStringMap)interpreter.getLocals()).copy();
134 }
135
136 /**
137 * Takes commands out of the queue and executes them. We get a lot of
138 * mileage out of BlockingQueue; it's thread-safe and will block if the
139 * queue is at capacity or empty.
140 *
141 * <p>Please note that this method <b>needs</b> to be the first method that
142 * gets called after creating a {@code Runner}.
143 */
144 public void run() {
145 synchronized (this) {
146 STD_OUT.addStream(console, interpreter, OutputType.NORMAL);
147 STD_ERR.addStream(console, interpreter, OutputType.ERROR);
148 }
149 while (true) {
150 try {
151 // woohoo for BlockingQueue!!
152 Command command = queue.take();
153 command.execute(interpreter);
154 } catch (Exception e) {
155 logger.error("failed to execute", e);
156 }
157 }
158 }
159
160 /**
161 * Queues up a series of Jython statements. Currently each command is
162 * treated as though the current user just entered it; the command appears
163 * in the input along with whatever output the command generates.
164 *
165 * @param source Batched command source. Anything but null is acceptable.
166 * @param batch The actual commands to execute.
167 */
168 public void queueBatch(final String source,
169 final List<String> batch)
170 {
171 queueCommand(new BatchCommand(console, source, batch));
172 }
173
174 /**
175 * Queues up a line of Jython for execution.
176 *
177 * @param line Text of the command.
178 */
179 public void queueLine(final String line) {
180 queueCommand(new LineCommand(console, line));
181 }
182
183 /**
184 * Queues the addition of an object to {@code interpreter}'s local
185 * namespace.
186 *
187 * @param name Object name as it will appear to {@code interpreter}.
188 * @param object Object to put in {@code interpreter}'s local namespace.
189 */
190 public void queueObject(final String name, final Object object) {
191 queueCommand(new InjectCommand(console, name, object));
192 }
193
194 /**
195 * Queues the removal of an object from {@code interpreter}'s local
196 * namespace.
197 *
198 * @param name Name of the object to be removed, <i>as it appears to
199 * Jython</i>.
200 *
201 * @see Runner#queueObject(String, Object)
202 */
203 public void queueRemoval(final String name) {
204 queueCommand(new EjectCommand(console, name));
205 }
206
207 /**
208 * Queues up a Jython file to be run by {@code interpreter}.
209 *
210 * @param name {@code __name__} attribute to use for loading {@code path}.
211 * @param path The path to the Jython file.
212 */
213 public void queueFile(final String name,
214 final String path)
215 {
216 queueCommand(new LoadFileCommand(console, name, path));
217 }
218
219 /**
220 * Queues up a command for execution.
221 *
222 * @param command Command to place in the execution queue.
223 */
224 private void queueCommand(final Command command) {
225 assert command != null : command;
226 try {
227 queue.put(command);
228 } catch (InterruptedException e) {
229 logger.warn("msg='{}' command='{}'", e.getMessage(), command);
230 }
231 }
232
233 @Override public String toString() {
234 return "[Runner@" + Integer.toHexString(hashCode()) +
235 ": interpreter=" + interpreter + ", interrupted=" + interrupted +
236 ", QUEUE_CAPACITY=" + QUEUE_CAPACITY + ", queue=" + queue + "]";
237 }
238 }