3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
10 # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 The SCons Foundation
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "src/engine/SCons/Job.py 3842 2008/12/20 22:59:52 scons"
39 # The default stack size (in kilobytes) of the threads used to execute
42 # We use a stack size of 256 kilobytes. The default on some platforms
43 # is too large and prevents us from creating enough threads to fully
44 # parallelized the build. For example, the default stack size on linux
47 explicit_stack_size = None
48 default_stack_size = 256
50 interrupt_msg = 'Build interrupted.'
55 self.interrupted = False
58 self.interrupted = True
61 return self.interrupted
65 """An instance of this class initializes N jobs, and provides
66 methods for starting, stopping, and waiting on all N jobs.
69 def __init__(self, num, taskmaster):
71 create 'num' jobs using the given taskmaster.
73 If 'num' is 1 or less, then a serial job will be used,
74 otherwise a parallel job with 'num' worker threads will
77 The 'num_jobs' attribute will be set to the actual number of jobs
78 allocated. If more than one job is requested but the Parallel
79 class can't do it, it gets reset to 1. Wrapping interfaces that
80 care should check the value of 'num_jobs' after initialization.
85 stack_size = explicit_stack_size
86 if stack_size is None:
87 stack_size = default_stack_size
90 self.job = Parallel(taskmaster, num, stack_size)
95 self.job = Serial(taskmaster)
98 def run(self, postfunc=lambda: None):
101 postfunc() will be invoked after the jobs has run. It will be
102 invoked even if the jobs are interrupted by a keyboard
103 interrupt (well, in fact by a signal such as either SIGINT,
104 SIGTERM or SIGHUP). The execution of postfunc() is protected
105 against keyboard interrupts and is guaranteed to run to
107 self._setup_sig_handler()
112 self._reset_sig_handler()
114 def were_interrupted(self):
115 """Returns whether the jobs were interrupted by a signal."""
116 return self.job.interrupted()
118 def _setup_sig_handler(self):
119 """Setup an interrupt handler so that SCons can shutdown cleanly in
122 a) SIGINT: Keyboard interrupt
123 b) SIGTERM: kill or system shutdown
124 c) SIGHUP: Controlling shell exiting
126 We handle all of these cases by stopping the taskmaster. It
127 turns out that it very difficult to stop the build process
128 by throwing asynchronously an exception such as
129 KeyboardInterrupt. For example, the python Condition
130 variables (threading.Condition) and Queue's do not seem to
131 asynchronous-exception-safe. It would require adding a whole
132 bunch of try/finally block and except KeyboardInterrupt all
135 Note also that we have to be careful to handle the case when
136 SCons forks before executing another process. In that case, we
137 want the child to exit immediately.
139 def handler(signum, stack, self=self, parentpid=os.getpid()):
140 if os.getpid() == parentpid:
141 self.job.taskmaster.stop()
142 self.job.interrupted.set()
146 self.old_sigint = signal.signal(signal.SIGINT, handler)
147 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
149 self.old_sighup = signal.signal(signal.SIGHUP, handler)
150 except AttributeError:
153 def _reset_sig_handler(self):
154 """Restore the signal handlers to their previous state (before the
155 call to _setup_sig_handler()."""
157 signal.signal(signal.SIGINT, self.old_sigint)
158 signal.signal(signal.SIGTERM, self.old_sigterm)
160 signal.signal(signal.SIGHUP, self.old_sighup)
161 except AttributeError:
165 """This class is used to execute tasks in series, and is more efficient
166 than Parallel, but is only appropriate for non-parallel builds. Only
167 one instance of this class should be in existence at a time.
169 This class is not thread safe.
172 def __init__(self, taskmaster):
173 """Create a new serial job given a taskmaster.
175 The taskmaster's next_task() method should return the next task
176 that needs to be executed, or None if there are no more tasks. The
177 taskmaster's executed() method will be called for each task when it
178 is successfully executed or failed() will be called if it failed to
179 execute (e.g. execute() raised an exception)."""
181 self.taskmaster = taskmaster
182 self.interrupted = InterruptState()
185 """Start the job. This will begin pulling tasks from the taskmaster
186 and executing them, and return when there are no more tasks. If a task
187 fails to execute (i.e. execute() raises an exception), then the job will
191 task = self.taskmaster.next_task()
198 if task.needs_execute():
201 if self.interrupted():
203 raise SCons.Errors.BuildError(
204 task.targets[0], errstr=interrupt_msg)
210 # Let the failed() callback function arrange for the
211 # build to stop if that's appropriate.
217 self.taskmaster.cleanup()
220 # Trap import failure so that everything in the Job module but the
221 # Parallel class (and its dependent classes) will work if the interpreter
222 # doesn't support threads.
229 class Worker(threading.Thread):
230 """A worker thread waits on a task to be posted to its request queue,
231 dequeues the task, executes it, and posts a tuple including the task
232 and a boolean indicating whether the task executed successfully. """
234 def __init__(self, requestQueue, resultsQueue, interrupted):
235 threading.Thread.__init__(self)
237 self.requestQueue = requestQueue
238 self.resultsQueue = resultsQueue
239 self.interrupted = interrupted
244 task = self.requestQueue.get()
247 # The "None" value is used as a sentinel by
248 # ThreadPool.cleanup(). This indicates that there
249 # are no more tasks, so we should quit.
253 if self.interrupted():
254 raise SCons.Errors.BuildError(
255 task.targets[0], errstr=interrupt_msg)
263 self.resultsQueue.put((task, ok))
266 """This class is responsible for spawning and managing worker threads."""
268 def __init__(self, num, stack_size, interrupted):
269 """Create the request and reply queues, and 'num' worker threads.
271 One must specify the stack size of the worker threads. The
272 stack size is specified in kilobytes.
274 self.requestQueue = Queue.Queue(0)
275 self.resultsQueue = Queue.Queue(0)
278 prev_size = threading.stack_size(stack_size*1024)
279 except AttributeError, e:
280 # Only print a warning if the stack size has been
282 if not explicit_stack_size is None:
283 msg = "Setting stack size is unsupported by this version of Python:\n " + \
285 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
286 except ValueError, e:
287 msg = "Setting stack size failed:\n " + str(e)
288 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
290 # Create worker threads
293 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
294 self.workers.append(worker)
296 # Once we drop Python 1.5 we can change the following to:
297 #if 'prev_size' in locals():
298 if 'prev_size' in locals().keys():
299 threading.stack_size(prev_size)
302 """Put task into request queue."""
303 self.requestQueue.put(task)
306 """Remove and return a result tuple from the results queue."""
307 return self.resultsQueue.get()
309 def preparation_failed(self, task):
310 self.resultsQueue.put((task, False))
314 Shuts down the thread pool, giving each worker thread a
315 chance to shut down gracefully.
317 # For each worker thread, put a sentinel "None" value
318 # on the requestQueue (indicating that there's no work
319 # to be done) so that each worker thread will get one and
320 # terminate gracefully.
321 for _ in self.workers:
322 self.requestQueue.put(None)
324 # Wait for all of the workers to terminate.
326 # If we don't do this, later Python versions (2.4, 2.5) often
327 # seem to raise exceptions during shutdown. This happens
328 # in requestQueue.get(), as an assertion failure that
329 # requestQueue.not_full is notified while not acquired,
330 # seemingly because the main thread has shut down (or is
331 # in the process of doing so) while the workers are still
332 # trying to pull sentinels off the requestQueue.
334 # Normally these terminations should happen fairly quickly,
335 # but we'll stick a one-second timeout on here just in case
337 for worker in self.workers:
342 """This class is used to execute tasks in parallel, and is somewhat
343 less efficient than Serial, but is appropriate for parallel builds.
345 This class is thread safe.
348 def __init__(self, taskmaster, num, stack_size):
349 """Create a new parallel job given a taskmaster.
351 The taskmaster's next_task() method should return the next
352 task that needs to be executed, or None if there are no more
353 tasks. The taskmaster's executed() method will be called
354 for each task when it is successfully executed or failed()
355 will be called if the task failed to execute (i.e. execute()
356 raised an exception).
358 Note: calls to taskmaster are serialized, but calls to
359 execute() on distinct tasks are not serialized, because
360 that is the whole point of parallel jobs: they can execute
361 multiple tasks simultaneously. """
363 self.taskmaster = taskmaster
364 self.interrupted = InterruptState()
365 self.tp = ThreadPool(num, stack_size, self.interrupted)
370 """Start the job. This will begin pulling tasks from the
371 taskmaster and executing them, and return when there are no
372 more tasks. If a task fails to execute (i.e. execute() raises
373 an exception), then the job will stop."""
378 # Start up as many available tasks as we're
380 while jobs < self.maxjobs:
381 task = self.taskmaster.next_task()
386 # prepare task for execution
393 if task.needs_execute():
401 if not task and not jobs: break
403 # Let any/all completed tasks finish up before we go
404 # back and put the next batch of tasks on the queue.
406 task, ok = self.tp.get()
412 if self.interrupted():
414 raise SCons.Errors.BuildError(
415 task.targets[0], errstr=interrupt_msg)
419 # Let the failed() callback function arrange
420 # for the build to stop if that's appropriate.
425 if self.tp.resultsQueue.empty():
429 self.taskmaster.cleanup()