Toplevel directory cleanup
[senf.git] / tools / scons-1.2.0 / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 The SCons Foundation
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "src/engine/SCons/Job.py 3842 2008/12/20 22:59:52 scons"
33
34 import os
35 import signal
36
37 import SCons.Errors
38
39 # The default stack size (in kilobytes) of the threads used to execute
40 # jobs in parallel.
41 #
42 # We use a stack size of 256 kilobytes. The default on some platforms
43 # is too large and prevents us from creating enough threads to fully
44 # parallelized the build. For example, the default stack size on linux
45 # is 8 MBytes.
46
47 explicit_stack_size = None
48 default_stack_size = 256
49
50 interrupt_msg = 'Build interrupted.'
51
52
53 class InterruptState:
54    def __init__(self):
55        self.interrupted = False
56
57    def set(self):
58        self.interrupted = True
59
60    def __call__(self):
61        return self.interrupted
62
63
64 class Jobs:
65     """An instance of this class initializes N jobs, and provides
66     methods for starting, stopping, and waiting on all N jobs.
67     """
68
69     def __init__(self, num, taskmaster):
70         """
71         create 'num' jobs using the given taskmaster.
72
73         If 'num' is 1 or less, then a serial job will be used,
74         otherwise a parallel job with 'num' worker threads will
75         be used.
76
77         The 'num_jobs' attribute will be set to the actual number of jobs
78         allocated.  If more than one job is requested but the Parallel
79         class can't do it, it gets reset to 1.  Wrapping interfaces that
80         care should check the value of 'num_jobs' after initialization.
81         """
82
83         self.job = None
84         if num > 1:
85             stack_size = explicit_stack_size
86             if stack_size is None:
87                 stack_size = default_stack_size
88                 
89             try:
90                 self.job = Parallel(taskmaster, num, stack_size)
91                 self.num_jobs = num
92             except NameError:
93                 pass
94         if self.job is None:
95             self.job = Serial(taskmaster)
96             self.num_jobs = 1
97
98     def run(self, postfunc=lambda: None):
99         """Run the jobs.
100
101         postfunc() will be invoked after the jobs has run. It will be
102         invoked even if the jobs are interrupted by a keyboard
103         interrupt (well, in fact by a signal such as either SIGINT,
104         SIGTERM or SIGHUP). The execution of postfunc() is protected
105         against keyboard interrupts and is guaranteed to run to
106         completion."""
107         self._setup_sig_handler()
108         try:
109             self.job.start()
110         finally:
111             postfunc()
112             self._reset_sig_handler()
113
114     def were_interrupted(self):
115         """Returns whether the jobs were interrupted by a signal."""
116         return self.job.interrupted()
117
118     def _setup_sig_handler(self):
119         """Setup an interrupt handler so that SCons can shutdown cleanly in
120         various conditions:
121
122           a) SIGINT: Keyboard interrupt
123           b) SIGTERM: kill or system shutdown
124           c) SIGHUP: Controlling shell exiting
125
126         We handle all of these cases by stopping the taskmaster. It
127         turns out that it very difficult to stop the build process
128         by throwing asynchronously an exception such as
129         KeyboardInterrupt. For example, the python Condition
130         variables (threading.Condition) and Queue's do not seem to
131         asynchronous-exception-safe. It would require adding a whole
132         bunch of try/finally block and except KeyboardInterrupt all
133         over the place.
134
135         Note also that we have to be careful to handle the case when
136         SCons forks before executing another process. In that case, we
137         want the child to exit immediately.
138         """
139         def handler(signum, stack, self=self, parentpid=os.getpid()):
140             if os.getpid() == parentpid:
141                 self.job.taskmaster.stop()
142                 self.job.interrupted.set()
143             else:
144                 os._exit(2)
145
146         self.old_sigint  = signal.signal(signal.SIGINT, handler)
147         self.old_sigterm = signal.signal(signal.SIGTERM, handler)
148         try:
149             self.old_sighup = signal.signal(signal.SIGHUP, handler)
150         except AttributeError:
151             pass
152
153     def _reset_sig_handler(self):
154         """Restore the signal handlers to their previous state (before the
155          call to _setup_sig_handler()."""
156
157         signal.signal(signal.SIGINT, self.old_sigint)
158         signal.signal(signal.SIGTERM, self.old_sigterm)
159         try:
160             signal.signal(signal.SIGHUP, self.old_sighup)
161         except AttributeError:
162             pass
163
164 class Serial:
165     """This class is used to execute tasks in series, and is more efficient
166     than Parallel, but is only appropriate for non-parallel builds. Only
167     one instance of this class should be in existence at a time.
168
169     This class is not thread safe.
170     """
171
172     def __init__(self, taskmaster):
173         """Create a new serial job given a taskmaster. 
174
175         The taskmaster's next_task() method should return the next task
176         that needs to be executed, or None if there are no more tasks. The
177         taskmaster's executed() method will be called for each task when it
178         is successfully executed or failed() will be called if it failed to
179         execute (e.g. execute() raised an exception)."""
180         
181         self.taskmaster = taskmaster
182         self.interrupted = InterruptState()
183
184     def start(self):
185         """Start the job. This will begin pulling tasks from the taskmaster
186         and executing them, and return when there are no more tasks. If a task
187         fails to execute (i.e. execute() raises an exception), then the job will
188         stop."""
189         
190         while 1:
191             task = self.taskmaster.next_task()
192
193             if task is None:
194                 break
195
196             try:
197                 task.prepare()
198                 if task.needs_execute():
199                     task.execute()
200             except:
201                 if self.interrupted():
202                     try:
203                         raise SCons.Errors.BuildError(
204                             task.targets[0], errstr=interrupt_msg)
205                     except:
206                         task.exception_set()
207                 else:
208                     task.exception_set()
209
210                 # Let the failed() callback function arrange for the
211                 # build to stop if that's appropriate.
212                 task.failed()
213             else:
214                 task.executed()
215
216             task.postprocess()
217         self.taskmaster.cleanup()
218
219
220 # Trap import failure so that everything in the Job module but the
221 # Parallel class (and its dependent classes) will work if the interpreter
222 # doesn't support threads.
223 try:
224     import Queue
225     import threading
226 except ImportError:
227     pass
228 else:
229     class Worker(threading.Thread):
230         """A worker thread waits on a task to be posted to its request queue,
231         dequeues the task, executes it, and posts a tuple including the task
232         and a boolean indicating whether the task executed successfully. """
233
234         def __init__(self, requestQueue, resultsQueue, interrupted):
235             threading.Thread.__init__(self)
236             self.setDaemon(1)
237             self.requestQueue = requestQueue
238             self.resultsQueue = resultsQueue
239             self.interrupted = interrupted
240             self.start()
241
242         def run(self):
243             while 1:
244                 task = self.requestQueue.get()
245
246                 if task is None:
247                     # The "None" value is used as a sentinel by
248                     # ThreadPool.cleanup().  This indicates that there
249                     # are no more tasks, so we should quit.
250                     break
251
252                 try:
253                     if self.interrupted():
254                         raise SCons.Errors.BuildError(
255                             task.targets[0], errstr=interrupt_msg)
256                     task.execute()
257                 except:
258                     task.exception_set()
259                     ok = False
260                 else:
261                     ok = True
262
263                 self.resultsQueue.put((task, ok))
264
265     class ThreadPool:
266         """This class is responsible for spawning and managing worker threads."""
267
268         def __init__(self, num, stack_size, interrupted):
269             """Create the request and reply queues, and 'num' worker threads.
270             
271             One must specify the stack size of the worker threads. The
272             stack size is specified in kilobytes.
273             """
274             self.requestQueue = Queue.Queue(0)
275             self.resultsQueue = Queue.Queue(0)
276
277             try:
278                 prev_size = threading.stack_size(stack_size*1024) 
279             except AttributeError, e:
280                 # Only print a warning if the stack size has been
281                 # explicitly set.
282                 if not explicit_stack_size is None:
283                     msg = "Setting stack size is unsupported by this version of Python:\n    " + \
284                         e.args[0]
285                     SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
286             except ValueError, e:
287                 msg = "Setting stack size failed:\n    " + str(e)
288                 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
289
290             # Create worker threads
291             self.workers = []
292             for _ in range(num):
293                 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
294                 self.workers.append(worker)
295
296             # Once we drop Python 1.5 we can change the following to:
297             #if 'prev_size' in locals():
298             if 'prev_size' in locals().keys():
299                 threading.stack_size(prev_size)
300
301         def put(self, task):
302             """Put task into request queue."""
303             self.requestQueue.put(task)
304
305         def get(self):
306             """Remove and return a result tuple from the results queue."""
307             return self.resultsQueue.get()
308
309         def preparation_failed(self, task):
310             self.resultsQueue.put((task, False))
311
312         def cleanup(self):
313             """
314             Shuts down the thread pool, giving each worker thread a
315             chance to shut down gracefully.
316             """
317             # For each worker thread, put a sentinel "None" value
318             # on the requestQueue (indicating that there's no work
319             # to be done) so that each worker thread will get one and
320             # terminate gracefully.
321             for _ in self.workers:
322                 self.requestQueue.put(None)
323
324             # Wait for all of the workers to terminate.
325             # 
326             # If we don't do this, later Python versions (2.4, 2.5) often
327             # seem to raise exceptions during shutdown.  This happens
328             # in requestQueue.get(), as an assertion failure that
329             # requestQueue.not_full is notified while not acquired,
330             # seemingly because the main thread has shut down (or is
331             # in the process of doing so) while the workers are still
332             # trying to pull sentinels off the requestQueue.
333             #
334             # Normally these terminations should happen fairly quickly,
335             # but we'll stick a one-second timeout on here just in case
336             # someone gets hung.
337             for worker in self.workers:
338                 worker.join(1.0)
339             self.workers = []
340
341     class Parallel:
342         """This class is used to execute tasks in parallel, and is somewhat 
343         less efficient than Serial, but is appropriate for parallel builds.
344
345         This class is thread safe.
346         """
347
348         def __init__(self, taskmaster, num, stack_size):
349             """Create a new parallel job given a taskmaster.
350
351             The taskmaster's next_task() method should return the next
352             task that needs to be executed, or None if there are no more
353             tasks. The taskmaster's executed() method will be called
354             for each task when it is successfully executed or failed()
355             will be called if the task failed to execute (i.e. execute()
356             raised an exception).
357
358             Note: calls to taskmaster are serialized, but calls to
359             execute() on distinct tasks are not serialized, because
360             that is the whole point of parallel jobs: they can execute
361             multiple tasks simultaneously. """
362
363             self.taskmaster = taskmaster
364             self.interrupted = InterruptState()
365             self.tp = ThreadPool(num, stack_size, self.interrupted)
366
367             self.maxjobs = num
368
369         def start(self):
370             """Start the job. This will begin pulling tasks from the
371             taskmaster and executing them, and return when there are no
372             more tasks. If a task fails to execute (i.e. execute() raises
373             an exception), then the job will stop."""
374
375             jobs = 0
376             
377             while 1:
378                 # Start up as many available tasks as we're
379                 # allowed to.
380                 while jobs < self.maxjobs:
381                     task = self.taskmaster.next_task()
382                     if task is None:
383                         break
384
385                     try:
386                         # prepare task for execution
387                         task.prepare()
388                     except:
389                         task.exception_set()
390                         task.failed()
391                         task.postprocess()
392                     else:
393                         if task.needs_execute():
394                             # dispatch task
395                             self.tp.put(task)
396                             jobs = jobs + 1
397                         else:
398                             task.executed()
399                             task.postprocess()
400
401                 if not task and not jobs: break
402
403                 # Let any/all completed tasks finish up before we go
404                 # back and put the next batch of tasks on the queue.
405                 while 1:
406                     task, ok = self.tp.get()
407                     jobs = jobs - 1
408
409                     if ok:
410                         task.executed()
411                     else:
412                         if self.interrupted():
413                             try:
414                                 raise SCons.Errors.BuildError(
415                                     task.targets[0], errstr=interrupt_msg)
416                             except:
417                                 task.exception_set()
418
419                         # Let the failed() callback function arrange
420                         # for the build to stop if that's appropriate.
421                         task.failed()
422
423                     task.postprocess()
424
425                     if self.tp.resultsQueue.empty():
426                         break
427
428             self.tp.cleanup()
429             self.taskmaster.cleanup()