001 /* 002 * $Id: Runner.java,v 1.20 2012/02/19 17:35:47 davep Exp $ 003 * 004 * This file is part of McIDAS-V 005 * 006 * Copyright 2007-2012 007 * Space Science and Engineering Center (SSEC) 008 * University of Wisconsin - Madison 009 * 1225 W. Dayton Street, Madison, WI 53706, USA 010 * https://www.ssec.wisc.edu/mcidas 011 * 012 * All Rights Reserved 013 * 014 * McIDAS-V is built on Unidata's IDV and SSEC's VisAD libraries, and 015 * some McIDAS-V source code is based on IDV and VisAD source code. 016 * 017 * McIDAS-V is free software; you can redistribute it and/or modify 018 * it under the terms of the GNU Lesser Public License as published by 019 * the Free Software Foundation; either version 3 of the License, or 020 * (at your option) any later version. 021 * 022 * McIDAS-V is distributed in the hope that it will be useful, 023 * but WITHOUT ANY WARRANTY; without even the implied warranty of 024 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 025 * GNU Lesser Public License for more details. 026 * 027 * You should have received a copy of the GNU Lesser Public License 028 * along with this program. If not, see http://www.gnu.org/licenses. 029 */ 030 031 package edu.wisc.ssec.mcidasv.jython; 032 033 import static edu.wisc.ssec.mcidasv.util.Contract.notNull; 034 035 import java.util.Collections; 036 import java.util.List; 037 import java.util.concurrent.ArrayBlockingQueue; 038 import java.util.concurrent.BlockingQueue; 039 040 import org.python.core.PyObject; 041 import org.python.core.PyStringMap; 042 import org.python.core.PySystemState; 043 044 import org.slf4j.Logger; 045 import org.slf4j.LoggerFactory; 046 047 import edu.wisc.ssec.mcidasv.jython.OutputStreamDemux.OutputType; 048 049 /** 050 * This class represents a specialized {@link Thread} that creates and executes 051 * {@link Command}s. A {@link BlockingQueue} is used to maintain thread safety 052 * and to cause a {@code Runner} to wait when the queue is at capacity or has 053 * no {@code Command}s to execute. 054 */ 055 public class Runner extends Thread { 056 057 private static final Logger logger = LoggerFactory.getLogger(Runner.class); 058 059 /** The maximum number of {@link Command}s that can be queued. */ 060 private static final int QUEUE_CAPACITY = 10; 061 062 /** 063 * Acts like a global output stream that redirects data to whichever 064 * {@link Console} matches the current thread name. 065 */ 066 private final OutputStreamDemux STD_OUT; 067 068 /** 069 * Acts like a global error stream that redirects data to whichever 070 * {@link Console} matches the current thread name. 071 */ 072 private final OutputStreamDemux STD_ERR; 073 074 /** Queue of {@link Command}s awaiting execution. */ 075 private final BlockingQueue<Command> queue; 076 077 /** */ 078 private final Console console; 079 080 /** */ 081 private final PySystemState systemState; 082 083 /** The Jython interpreter that will actually run the queued commands. */ 084 private final Interpreter interpreter; 085 086 /** Not in use yet. */ 087 private boolean interrupted = false; 088 089 /** 090 * 091 * 092 * @param console 093 */ 094 public Runner(final Console console) { 095 this(console, Collections.<String>emptyList()); 096 } 097 098 /** 099 * 100 * 101 * @param console 102 * @param commands 103 */ 104 public Runner(final Console console, final List<String> commands) { 105 notNull(console, commands); 106 this.console = console; 107 this.STD_ERR = new OutputStreamDemux(); 108 this.STD_OUT = new OutputStreamDemux(); 109 this.queue = new ArrayBlockingQueue<Command>(QUEUE_CAPACITY, true); 110 this.systemState = new PySystemState(); 111 this.interpreter = new Interpreter(systemState, STD_OUT, STD_ERR); 112 for (String command : commands) { 113 queueLine(command); 114 } 115 } 116 117 /** 118 * Registers a new callback handler. Currently this only forwards the new 119 * handler to {@link Interpreter#setCallbackHandler(ConsoleCallback)}. 120 * 121 * @param newCallback The callback handler to register. 122 */ 123 protected void setCallbackHandler(final ConsoleCallback newCallback) { 124 queueCommand(new RegisterCallbackCommand(console, newCallback)); 125 } 126 127 /** 128 * Fetches, copies, and returns the {@link #interpreter}'s local namespace. 129 * 130 * @return Copy of the interpreter's local namespace. 131 */ 132 protected PyStringMap copyLocals() { 133 return ((PyStringMap)interpreter.getLocals()).copy(); 134 } 135 136 /** 137 * Takes commands out of the queue and executes them. We get a lot of 138 * mileage out of BlockingQueue; it's thread-safe and will block if the 139 * queue is at capacity or empty. 140 * 141 * <p>Please note that this method <b>needs</b> to be the first method that 142 * gets called after creating a {@code Runner}. 143 */ 144 public void run() { 145 synchronized (this) { 146 STD_OUT.addStream(console, interpreter, OutputType.NORMAL); 147 STD_ERR.addStream(console, interpreter, OutputType.ERROR); 148 } 149 while (true) { 150 try { 151 // woohoo for BlockingQueue!! 152 Command command = queue.take(); 153 command.execute(interpreter); 154 } catch (Exception e) { 155 logger.error("failed to execute", e); 156 } 157 } 158 } 159 160 /** 161 * Queues up a series of Jython statements. Currently each command is 162 * treated as though the current user just entered it; the command appears 163 * in the input along with whatever output the command generates. 164 * 165 * @param source Batched command source. Anything but null is acceptable. 166 * @param batch The actual commands to execute. 167 */ 168 public void queueBatch(final String source, 169 final List<String> batch) 170 { 171 queueCommand(new BatchCommand(console, source, batch)); 172 } 173 174 /** 175 * Queues up a line of Jython for execution. 176 * 177 * @param line Text of the command. 178 */ 179 public void queueLine(final String line) { 180 queueCommand(new LineCommand(console, line)); 181 } 182 183 /** 184 * Queues the addition of an object to {@code interpreter}'s local 185 * namespace. 186 * 187 * @param name Object name as it will appear to {@code interpreter}. 188 * @param object Object to put in {@code interpreter}'s local namespace. 189 */ 190 public void queueObject(final String name, final Object object) { 191 queueCommand(new InjectCommand(console, name, object)); 192 } 193 194 /** 195 * Queues the removal of an object from {@code interpreter}'s local 196 * namespace. 197 * 198 * @param name Name of the object to be removed, <i>as it appears to 199 * Jython</i>. 200 * 201 * @see Runner#queueObject(String, Object) 202 */ 203 public void queueRemoval(final String name) { 204 queueCommand(new EjectCommand(console, name)); 205 } 206 207 /** 208 * Queues up a Jython file to be run by {@code interpreter}. 209 * 210 * @param name {@code __name__} attribute to use for loading {@code path}. 211 * @param path The path to the Jython file. 212 */ 213 public void queueFile(final String name, 214 final String path) 215 { 216 queueCommand(new LoadFileCommand(console, name, path)); 217 } 218 219 /** 220 * Queues up a command for execution. 221 * 222 * @param command Command to place in the execution queue. 223 */ 224 private void queueCommand(final Command command) { 225 assert command != null : command; 226 try { 227 queue.put(command); 228 } catch (InterruptedException e) { 229 logger.warn("msg='{}' command='{}'", e.getMessage(), command); 230 } 231 } 232 233 @Override public String toString() { 234 return "[Runner@" + Integer.toHexString(hashCode()) + 235 ": interpreter=" + interpreter + ", interrupted=" + interrupted + 236 ", QUEUE_CAPACITY=" + QUEUE_CAPACITY + ", queue=" + queue + "]"; 237 } 238 }