Bug 1666809 - Insert decision task indexes directly via taskluster API rather than index-task, r=taskgraph-reviewers,aki

We started using the "backstop" index added by bug 1660506 to determine whether
a push should be a backstop based on a time interval. The problem is that this
index gets added by an index-task that runs after the decision task has
completed.  Therefore, if two pushes land at roughly the same time (i.e, the
second decision task starts before the first completes), then they can both
determine themselves as backstops.

This patch gets around the problem by inserting the "backstop" index as early
as possible (immediately after resolving parameters), so the chances of this
happening become very low. It's still theoretically possible that it could
happen again, but we don't need this to be 100% perfect. As long as it is rare,
it's good enough.

Depends on D91191

Differential Revision: https://phabricator.services.mozilla.com/D91192
This commit is contained in:
Andrew Halberstadt 2020-09-24 19:19:36 +00:00
parent da707c8ee1
commit c288f18538
4 changed files with 34 additions and 42 deletions

View file

@ -157,6 +157,7 @@ tasks:
- 'assume:repo:${repoUrl[8:]}:branch:default' - 'assume:repo:${repoUrl[8:]}:branch:default'
- 'queue:route:notify.email.${ownerEmail}.*' - 'queue:route:notify.email.${ownerEmail}.*'
- 'in-tree:hook-action:project-${trustDomain}/in-tree-action-${repository.level}-*' - 'in-tree:hook-action:project-${trustDomain}/in-tree-action-${repository.level}-*'
- 'index:insert-task:${trustDomain}.v2.${repository.project}.*'
else: else:
$if: 'tasks_for == "action"' $if: 'tasks_for == "action"'
then: then:

View file

@ -31,7 +31,7 @@ from .util.chunking import resolver
from .util.hg import get_hg_revision_branch, get_hg_commit_message from .util.hg import get_hg_revision_branch, get_hg_commit_message
from .util.partials import populate_release_history from .util.partials import populate_release_history
from .util.schema import validate_schema, Schema from .util.schema import validate_schema, Schema
from .util.taskcluster import get_artifact from .util.taskcluster import get_artifact, insert_index
from .util.taskgraph import find_decision_task, find_existing_tasks_from_previous_kinds from .util.taskgraph import find_decision_task, find_existing_tasks_from_previous_kinds
from .util.yaml import load_yaml from .util.yaml import load_yaml
from voluptuous import Required, Optional from voluptuous import Required, Optional
@ -228,6 +228,9 @@ def taskgraph_decision(options, parameters=None):
write_artifacts=True, write_artifacts=True,
) )
# set additional index paths for the decision task
set_decision_indexes(decision_task_id, tgg.parameters, tgg.graph_config)
# write out the parameters used to generate this graph # write out the parameters used to generate this graph
write_artifact('parameters.yml', dict(**tgg.parameters)) write_artifact('parameters.yml', dict(**tgg.parameters))
@ -469,6 +472,19 @@ def set_try_config(parameters, task_config_file):
parameters['optimize_target_tasks'] = True parameters['optimize_target_tasks'] = True
def set_decision_indexes(decision_task_id, params, graph_config):
index_paths = []
if params["backstop"]:
index_paths.append("{trust-domain}.v2.{project}.latest.taskgraph.backstop")
subs = params.copy()
subs["trust-domain"] = graph_config["trust-domain"]
index_paths = [i.format(**subs) for i in index_paths]
for index_path in index_paths:
insert_index(index_path, decision_task_id, use_proxy=True)
def write_artifact(filename, data): def write_artifact(filename, data):
logger.info('writing artifact file `{}`'.format(filename)) logger.info('writing artifact file `{}`'.format(filename))
if not os.path.isdir(ARTIFACTS_DIR): if not os.path.isdir(ARTIFACTS_DIR):

View file

@ -30,7 +30,6 @@ from slugid import nice as slugid
from .task import Task from .task import Task
from .graph import Graph from .graph import Graph
from .taskgraph import TaskGraph from .taskgraph import TaskGraph
from .util.taskcluster import get_task_definition
from .util.workertypes import get_worker_type from .util.workertypes import get_worker_type
here = os.path.abspath(os.path.dirname(__file__)) here = os.path.abspath(os.path.dirname(__file__))
@ -122,17 +121,6 @@ def derive_misc_task(
return task return task
def get_decision_indexes(parameters, graph_config):
index_paths = []
if parameters["backstop"]:
index_paths.append("{trust-domain}.v2.{project}.latest.taskgraph.backstop")
subs = parameters.copy()
subs["trust-domain"] = graph_config["trust-domain"]
return [i.format(**subs) for i in index_paths]
# these regular expressions capture route prefixes for which we have a star # these regular expressions capture route prefixes for which we have a star
# scope, allowing them to be summarized. Each should correspond to a star scope # scope, allowing them to be summarized. Each should correspond to a star scope
# in each Gecko `assume:repo:hg.mozilla.org/...` role. # in each Gecko `assume:repo:hg.mozilla.org/...` role.
@ -198,36 +186,8 @@ def add_index_tasks(
""" """
logger.debug('Morphing: adding index tasks') logger.debug('Morphing: adding index tasks')
added = []
# Add an index task for the decision task itself.
index_paths = get_decision_indexes(parameters, graph_config)
if index_paths:
task_def = get_task_definition(decision_task_id)
decision_task = Task(
kind="decision",
label="decision",
description="decision task",
attributes={},
task=task_def,
)
decision_task.task_id = decision_task_id
added.append(
make_index_task(
decision_task,
taskgraph,
label_to_taskid,
parameters,
graph_config,
index_paths=index_paths,
index_rank=0,
purpose="index-task",
dependencies={},
)
)
# Add indexes for tasks that exceed MAX_ROUTES. # Add indexes for tasks that exceed MAX_ROUTES.
added = []
for label, task in six.iteritems(taskgraph.tasks): for label, task in six.iteritems(taskgraph.tasks):
if len(task.task.get('routes', [])) <= MAX_ROUTES: if len(task.task.get('routes', [])) <= MAX_ROUTES:
continue continue

View file

@ -220,6 +220,21 @@ def list_tasks(index_path, use_proxy=False):
return [t['taskId'] for t in results] return [t['taskId'] for t in results]
def insert_index(index_path, task_id, data=None, use_proxy=False):
index_url = get_index_url(index_path, use_proxy=use_proxy)
# Find task expiry.
expires = get_task_definition(task_id, use_proxy=use_proxy)["expires"]
response = _do_request(index_url, method="put", json={
"taskId": task_id,
"rank": 0,
"data": data or {},
"expires": expires,
})
return response
def parse_time(timestamp): def parse_time(timestamp):
"""Turn a "JSON timestamp" as used in TC APIs into a datetime""" """Turn a "JSON timestamp" as used in TC APIs into a datetime"""
return datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') return datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')