001    /*
002     * $Id: Runner.java,v 1.20 2012/02/19 17:35:47 davep Exp $
003     *
004     * This file is part of McIDAS-V
005     *
006     * Copyright 2007-2012
007     * Space Science and Engineering Center (SSEC)
008     * University of Wisconsin - Madison
009     * 1225 W. Dayton Street, Madison, WI 53706, USA
010     * https://www.ssec.wisc.edu/mcidas
011     * 
012     * All Rights Reserved
013     * 
014     * McIDAS-V is built on Unidata's IDV and SSEC's VisAD libraries, and
015     * some McIDAS-V source code is based on IDV and VisAD source code.  
016     * 
017     * McIDAS-V is free software; you can redistribute it and/or modify
018     * it under the terms of the GNU Lesser Public License as published by
019     * the Free Software Foundation; either version 3 of the License, or
020     * (at your option) any later version.
021     * 
022     * McIDAS-V is distributed in the hope that it will be useful,
023     * but WITHOUT ANY WARRANTY; without even the implied warranty of
024     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
025     * GNU Lesser Public License for more details.
026     * 
027     * You should have received a copy of the GNU Lesser Public License
028     * along with this program.  If not, see http://www.gnu.org/licenses.
029     */
030    
031    package edu.wisc.ssec.mcidasv.jython;
032    
033    import static edu.wisc.ssec.mcidasv.util.Contract.notNull;
034    
035    import java.util.Collections;
036    import java.util.List;
037    import java.util.concurrent.ArrayBlockingQueue;
038    import java.util.concurrent.BlockingQueue;
039    
040    import org.python.core.PyObject;
041    import org.python.core.PyStringMap;
042    import org.python.core.PySystemState;
043    
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    import edu.wisc.ssec.mcidasv.jython.OutputStreamDemux.OutputType;
048    
049    /**
050     * This class represents a specialized {@link Thread} that creates and executes 
051     * {@link Command}s. A {@link BlockingQueue} is used to maintain thread safety
052     * and to cause a {@code Runner} to wait when the queue is at capacity or has
053     * no {@code Command}s to execute.
054     */
055    public class Runner extends Thread {
056    
057        private static final Logger logger = LoggerFactory.getLogger(Runner.class);
058    
059        /** The maximum number of {@link Command}s that can be queued. */
060        private static final int QUEUE_CAPACITY = 10;
061    
062        /** 
063         * Acts like a global output stream that redirects data to whichever 
064         * {@link Console} matches the current thread name.
065         */
066        private final OutputStreamDemux STD_OUT;
067    
068        /** 
069         * Acts like a global error stream that redirects data to whichever 
070         * {@link Console} matches the current thread name.
071         */
072        private final OutputStreamDemux STD_ERR;
073    
074        /** Queue of {@link Command}s awaiting execution. */
075        private final BlockingQueue<Command> queue;
076    
077        /** */
078        private final Console console;
079    
080        /** */
081        private final PySystemState systemState;
082    
083        /** The Jython interpreter that will actually run the queued commands. */
084        private final Interpreter interpreter;
085    
086        /** Not in use yet. */
087        private boolean interrupted = false;
088    
089        /**
090         * 
091         * 
092         * @param console
093         */
094        public Runner(final Console console) {
095            this(console, Collections.<String>emptyList());
096        }
097    
098        /**
099         * 
100         * 
101         * @param console
102         * @param commands
103         */
104        public Runner(final Console console, final List<String> commands) {
105            notNull(console, commands);
106            this.console = console;
107            this.STD_ERR = new OutputStreamDemux();
108            this.STD_OUT = new OutputStreamDemux();
109            this.queue = new ArrayBlockingQueue<Command>(QUEUE_CAPACITY, true);
110            this.systemState = new PySystemState();
111            this.interpreter = new Interpreter(systemState, STD_OUT, STD_ERR);
112            for (String command : commands) {
113                queueLine(command);
114            }
115        }
116    
117        /**
118         * Registers a new callback handler. Currently this only forwards the new
119         * handler to {@link Interpreter#setCallbackHandler(ConsoleCallback)}.
120         * 
121         * @param newCallback The callback handler to register.
122         */
123        protected void setCallbackHandler(final ConsoleCallback newCallback) {
124            queueCommand(new RegisterCallbackCommand(console, newCallback));
125        }
126    
127        /**
128         * Fetches, copies, and returns the {@link #interpreter}'s local namespace.
129         * 
130         * @return Copy of the interpreter's local namespace.
131         */
132        protected PyStringMap copyLocals() {
133            return ((PyStringMap)interpreter.getLocals()).copy();
134        }
135    
136        /**
137         * Takes commands out of the queue and executes them. We get a lot of 
138         * mileage out of BlockingQueue; it's thread-safe and will block if the 
139         * queue is at capacity or empty.
140         * 
141         * <p>Please note that this method <b>needs</b> to be the first method that
142         * gets called after creating a {@code Runner}.
143         */
144        public void run() {
145            synchronized (this) {
146                STD_OUT.addStream(console, interpreter, OutputType.NORMAL);
147                STD_ERR.addStream(console, interpreter, OutputType.ERROR);
148            }
149            while (true) {
150                try {
151                    // woohoo for BlockingQueue!!
152                    Command command = queue.take();
153                    command.execute(interpreter);
154                } catch (Exception e) {
155                    logger.error("failed to execute", e);
156                }
157            }
158        }
159    
160        /**
161         * Queues up a series of Jython statements. Currently each command is 
162         * treated as though the current user just entered it; the command appears
163         * in the input along with whatever output the command generates.
164         * 
165         * @param source Batched command source. Anything but null is acceptable.
166         * @param batch The actual commands to execute.
167         */
168        public void queueBatch(final String source,
169            final List<String> batch) 
170        {
171            queueCommand(new BatchCommand(console, source, batch));
172        }
173    
174        /**
175         * Queues up a line of Jython for execution.
176         * 
177         * @param line Text of the command.
178         */
179        public void queueLine(final String line) {
180            queueCommand(new LineCommand(console, line));
181        }
182    
183        /**
184         * Queues the addition of an object to {@code interpreter}'s local 
185         * namespace.
186         *
187         * @param name Object name as it will appear to {@code interpreter}.
188         * @param object Object to put in {@code interpreter}'s local namespace.
189         */
190        public void queueObject(final String name, final Object object) {
191            queueCommand(new InjectCommand(console, name, object));
192        }
193    
194        /**
195         * Queues the removal of an object from {@code interpreter}'s local 
196         * namespace. 
197         * 
198         * @param name Name of the object to be removed, <i>as it appears to
199         * Jython</i>.
200         * 
201         * @see Runner#queueObject(String, Object)
202         */
203        public void queueRemoval(final String name) {
204            queueCommand(new EjectCommand(console, name));
205        }
206    
207        /**
208         * Queues up a Jython file to be run by {@code interpreter}.
209         *
210         * @param name {@code __name__} attribute to use for loading {@code path}.
211         * @param path The path to the Jython file.
212         */
213        public void queueFile(final String name,
214            final String path) 
215        {
216            queueCommand(new LoadFileCommand(console, name, path));
217        }
218    
219        /**
220         * Queues up a command for execution.
221         * 
222         * @param command Command to place in the execution queue.
223         */
224        private void queueCommand(final Command command) {
225            assert command != null : command;
226            try {
227                queue.put(command);
228            } catch (InterruptedException e) {
229                logger.warn("msg='{}' command='{}'", e.getMessage(), command);
230            }
231        }
232    
233        @Override public String toString() {
234            return "[Runner@" + Integer.toHexString(hashCode()) + 
235                ": interpreter=" + interpreter + ", interrupted=" + interrupted +
236                ", QUEUE_CAPACITY=" + QUEUE_CAPACITY + ", queue=" + queue + "]"; 
237        }
238    }