forked from mirrors/gecko-dev
		
	 b8d8f8a712
			
		
	
	
		b8d8f8a712
		
	
	
	
	
		
			
			This adds `.cron.yml` and a new mach command to interpret it. While functionality is limited to nightlies right now, there is room to expand to more diverse periodic tasks. Let your imagination run wild! MozReview-Commit-ID: KxQkaUbsjQs --HG-- extra : rebase_source : ddf0a1eadae5a1169c0ead7bcb7b9ce61b255fbf
		
			
				
	
	
		
			122 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # This Source Code Form is subject to the terms of the Mozilla Public
 | |
| # License, v. 2.0. If a copy of the MPL was not distributed with this
 | |
| # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 | |
| 
 | |
| from __future__ import absolute_import, print_function, unicode_literals
 | |
| 
 | |
| import concurrent.futures as futures
 | |
| import requests
 | |
| import requests.adapters
 | |
| import json
 | |
| import os
 | |
| import logging
 | |
| 
 | |
| from slugid import nice as slugid
 | |
| from taskgraph.util.time import (
 | |
|     current_json_time,
 | |
|     json_time_from_now
 | |
| )
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| # the maximum number of parallel createTask calls to make
 | |
| CONCURRENCY = 50
 | |
| 
 | |
| 
 | |
| def create_tasks(taskgraph, label_to_taskid, params):
 | |
|     taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
 | |
| 
 | |
|     session = requests.Session()
 | |
| 
 | |
|     # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
 | |
|     # that limit. Connections are established as needed, so using a large value
 | |
|     # should not negatively impact performance.
 | |
|     http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
 | |
|                                                  pool_maxsize=CONCURRENCY)
 | |
|     session.mount('https://', http_adapter)
 | |
|     session.mount('http://', http_adapter)
 | |
| 
 | |
|     decision_task_id = os.environ.get('TASK_ID')
 | |
| 
 | |
|     # when running as an actual decision task, we use the decision task's
 | |
|     # taskId as the taskGroupId.  The process that created the decision task
 | |
|     # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
 | |
|     # fall back to a slugid
 | |
|     task_group_id = decision_task_id or slugid()
 | |
|     scheduler_id = 'gecko-level-{}'.format(params['level'])
 | |
| 
 | |
|     with futures.ThreadPoolExecutor(CONCURRENCY) as e:
 | |
|         fs = {}
 | |
| 
 | |
|         # We can't submit a task until its dependencies have been submitted.
 | |
|         # So our strategy is to walk the graph and submit tasks once all
 | |
|         # their dependencies have been submitted.
 | |
|         #
 | |
|         # Using visit_postorder() here isn't the most efficient: we'll
 | |
|         # block waiting for dependencies of task N to submit even though
 | |
|         # dependencies for task N+1 may be finished. If we need to optimize
 | |
|         # this further, we can build a graph of task dependencies and walk
 | |
|         # that.
 | |
|         for task_id in taskgraph.graph.visit_postorder():
 | |
|             task_def = taskgraph.tasks[task_id].task
 | |
|             attributes = taskgraph.tasks[task_id].attributes
 | |
|             # if this task has no dependencies, make it depend on this decision
 | |
|             # task so that it does not start immediately; and so that if this loop
 | |
|             # fails halfway through, none of the already-created tasks run.
 | |
|             if decision_task_id and not task_def.get('dependencies'):
 | |
|                 task_def['dependencies'] = [decision_task_id]
 | |
| 
 | |
|             task_def['taskGroupId'] = task_group_id
 | |
|             task_def['schedulerId'] = scheduler_id
 | |
| 
 | |
|             # Wait for dependencies before submitting this.
 | |
|             deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
 | |
|                        if dep in fs]
 | |
|             for f in futures.as_completed(deps_fs):
 | |
|                 f.result()
 | |
| 
 | |
|             fs[task_id] = e.submit(create_task, session, task_id,
 | |
|                                    taskid_to_label[task_id], task_def)
 | |
| 
 | |
|             # Schedule tasks as many times as task_duplicates indicates
 | |
|             for i in range(1, attributes.get('task_duplicates', 1)):
 | |
|                 # We use slugid() since we want a distinct task id
 | |
|                 fs[task_id] = e.submit(create_task, session, slugid(),
 | |
|                                        taskid_to_label[task_id], task_def)
 | |
| 
 | |
|         # Wait for all futures to complete.
 | |
|         for f in futures.as_completed(fs.values()):
 | |
|             f.result()
 | |
| 
 | |
| 
 | |
| def create_task(session, task_id, label, task_def):
 | |
|     # create the task using 'http://taskcluster/queue', which is proxied to the queue service
 | |
|     # with credentials appropriate to this job.
 | |
| 
 | |
|     # Resolve timestamps
 | |
|     now = current_json_time(datetime_format=True)
 | |
|     task_def = resolve_timestamps(now, task_def)
 | |
| 
 | |
|     logger.debug("Creating task with taskId {} for {}".format(task_id, label))
 | |
|     res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
 | |
|                       data=json.dumps(task_def))
 | |
|     if res.status_code != 200:
 | |
|         try:
 | |
|             logger.error(res.json()['message'])
 | |
|         except:
 | |
|             logger.error(res.text)
 | |
|         res.raise_for_status()
 | |
| 
 | |
| 
 | |
| def resolve_timestamps(now, task_def):
 | |
|     def recurse(val):
 | |
|         if isinstance(val, list):
 | |
|             return [recurse(v) for v in val]
 | |
|         elif isinstance(val, dict):
 | |
|             if val.keys() == ['relative-datestamp']:
 | |
|                 return json_time_from_now(val['relative-datestamp'], now)
 | |
|             else:
 | |
|                 return {k: recurse(v) for k, v in val.iteritems()}
 | |
|         else:
 | |
|             return val
 | |
|     return recurse(task_def)
 |