1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""The framework stage that produces the next generation of tasks to run. 5 6Part of the Chrome build flags optimization. 7""" 8 9__author__ = 'yuhenglong@google.com (Yuheng Long)' 10 11import pipeline_process 12 13 14def Steering(cache, generations, input_queue, result_queue): 15 """The core method template that produces the next generation of tasks to run. 16 17 This method waits for the results of the tasks from the previous generation. 18 Upon the arrival of all these results, the method uses them to generate the 19 next generation of tasks. 20 21 The main logic of producing the next generation from previous generation is 22 application specific. For example, in the genetic algorithm, a task is 23 produced by combining two parents tasks, while in the hill climbing algorithm, 24 a task is generated by its immediate neighbor. The method 'Next' is overridden 25 in the concrete subclasses of the class Generation to produce the next 26 application-specific generation. The steering method invokes the 'Next' 27 method, produces the next generation and submits the tasks in this generation 28 to the next stage, e.g., the build/compilation stage. 29 30 Args: 31 cache: It stores the experiments that have been conducted before. Used to 32 avoid duplicate works. 33 generations: The initial generations of tasks to be run. 34 input_queue: The input results from the last stage of the framework. These 35 results will trigger new iteration of the algorithm. 36 result_queue: The output task queue for this pipeline stage. The new tasks 37 generated by the steering algorithm will be sent to the next stage via 38 this queue. 39 """ 40 41 # Generations that have pending tasks to be executed. Pending tasks are those 42 # whose results are not ready. The tasks that have their results ready are 43 # referenced to as ready tasks. Once there is no pending generation, the 44 # algorithm terminates. 45 waiting = generations 46 47 # Record how many initial tasks there are. If there is no task at all, the 48 # algorithm can terminate right away. 49 num_tasks = 0 50 51 # Submit all the tasks in the initial generations to the next stage of the 52 # framework. The next stage can be the build/compilation stage. 53 for generation in generations: 54 # Only send the task that has not been performed before to the next stage. 55 for task in [task for task in generation.Pool() if task not in cache]: 56 result_queue.put(task) 57 cache.add(task) 58 num_tasks += 1 59 60 # If there is no task to be executed at all, the algorithm returns right away. 61 if not num_tasks: 62 # Inform the next stage that there will be no more task. 63 result_queue.put(pipeline_process.POISONPILL) 64 return 65 66 # The algorithm is done if there is no pending generation. A generation is 67 # pending if it has pending task. 68 while waiting: 69 # Busy-waiting for the next task. 70 if input_queue.empty(): 71 continue 72 73 # If there is a task whose result is ready from the last stage of the 74 # feedback loop, there will be one less pending task. 75 76 task = input_queue.get() 77 78 # Store the result of this ready task. Intermediate results can be used to 79 # generate report for final result or be used to reboot from a crash from 80 # the failure of any module of the framework. 81 task.LogSteeringCost() 82 83 # Find out which pending generation this ready task belongs to. This pending 84 # generation will have one less pending task. The "next" expression iterates 85 # the generations in waiting until the first generation whose UpdateTask 86 # method returns true. 87 generation = next(gen for gen in waiting if gen.UpdateTask(task)) 88 89 # If there is still any pending task, do nothing. 90 if not generation.Done(): 91 continue 92 93 # All the tasks in the generation are finished. The generation is ready to 94 # produce the next generation. 95 waiting.remove(generation) 96 97 # Check whether a generation should generate the next generation. 98 # A generation may not generate the next generation, e.g., because a 99 # fixpoint has been reached, there has not been any improvement for a few 100 # generations or a local maxima is reached. 101 if not generation.IsImproved(): 102 continue 103 104 for new_generation in generation.Next(cache): 105 # Make sure that each generation should contain at least one task. 106 assert new_generation.Pool() 107 waiting.append(new_generation) 108 109 # Send the tasks of the new generations to the next stage for execution. 110 for new_task in new_generation.Pool(): 111 result_queue.put(new_task) 112 cache.add(new_task) 113 114 # Steering algorithm is finished and it informs the next stage that there will 115 # be no more task. 116 result_queue.put(pipeline_process.POISONPILL) 117