Bug 1611989 - Refactor try estimates for code re-use. r=ahal

Add duration estimates to push summary. Refactored preview script - needed to be moved in order to import module.

Differential Revision: https://phabricator.services.mozilla.com/D61195

--HG--
rename : tools/tryselect/formatters/preview.py => tools/tryselect/preview.py
extra : moz-landing-system : lando
This commit is contained in:
Simon Fraser 2020-02-12 21:06:56 +00:00
parent da0e3ab1b4
commit bb0152beb3
5 changed files with 349 additions and 245 deletions

View file

@ -1,147 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# This script is intended to be called through fzf as a preview formatter.
from __future__ import absolute_import, print_function
import json
import os
from datetime import timedelta, datetime
import argparse
def process_args():
argparser = argparse.ArgumentParser()
argparser.add_argument('-d', '--durations-file', type=str, default=None)
argparser.add_argument('-g', '--graph-cache', type=str, default=None)
argparser.add_argument('-q', '--quantiles-file', type=str, default=None)
argparser.add_argument('tasklist', type=str)
return argparser.parse_args()
def plain_data(tasklist):
print("\n".join(sorted(s.strip("'") for s in tasklist.split())))
def find_all_dependencies(graph, tasklist):
all_dependencies = dict()
def find_dependencies(task):
dependencies = set()
dependencies.add(task)
if task in all_dependencies:
return all_dependencies[task]
for dep in graph.get(task, list()):
all_dependencies[task] = find_dependencies(dep)
dependencies.update(all_dependencies[task])
return dependencies
full_deps = set()
for task in tasklist:
full_deps.update(find_dependencies(task))
# Since these have been asked for, they're not inherited dependencies.
return sorted(full_deps - set(tasklist))
def find_longest_path(graph, tasklist, duration_data):
dep_durations = dict()
def find_dependency_durations(task):
if task in dep_durations:
return dep_durations[task]
durations = [find_dependency_durations(dep)
for dep in graph.get(task, list())]
durations.append(0.0)
md = max(durations) + duration_data.get(task, 0.0)
dep_durations[task] = md
return md
longest_paths = [find_dependency_durations(task) for task in tasklist]
return max(longest_paths)
def determine_quantile(quantiles_file, duration):
duration = duration.total_seconds()
with open(quantiles_file) as f:
f.readline() # skip header
boundaries = [float(l.strip()) for l in f.readlines()]
boundaries.sort()
for i, v in enumerate(boundaries):
if duration < v:
break
# In case we weren't given 100 elements
return int(100 * i / len(boundaries))
def duration_data(durations_file, graph_cache_file, quantiles_file, tasklist):
tasklist = [t.strip("'") for t in tasklist.split()]
with open(durations_file) as f:
durations = json.load(f)
durations = {d['name']: d['mean_duration_seconds'] for d in durations}
graph = dict()
if graph_cache_file:
with open(graph_cache_file) as f:
graph = json.load(f)
dependencies = find_all_dependencies(graph, tasklist)
longest_path = find_longest_path(graph, tasklist, durations)
dependency_duration = 0.0
for task in dependencies:
dependency_duration += int(durations.get(task, 0.0))
total_requested_duration = 0.0
for task in tasklist:
duration = int(durations.get(task, 0.0))
total_requested_duration += duration
output = ""
duration_width = 5 # show five numbers at most.
max_columns = int(os.environ['FZF_PREVIEW_COLUMNS'])
total_requested_duration = timedelta(seconds=total_requested_duration)
total_dependency_duration = timedelta(seconds=dependency_duration)
output += "\nSelected tasks take {}\n".format(total_requested_duration)
output += "+{} dependencies, total {}\n".format(
len(dependencies), total_dependency_duration + total_requested_duration)
quantile = None
if quantiles_file and os.path.isfile(quantiles_file):
quantile = 100 - determine_quantile(quantiles_file,
total_dependency_duration + total_requested_duration)
if quantile:
output += "This is in the top {}% of requests\n".format(quantile)
output += "Estimated finish in {} at {}".format(
timedelta(seconds=int(longest_path)),
(datetime.now()+timedelta(seconds=longest_path)).strftime("%H:%M"))
output += "{:>{width}}\n".format("Duration", width=max_columns)
for task in tasklist:
duration = int(durations.get(task, 0.0))
output += "{:{align}{width}} {:{nalign}{nwidth}}s\n".format(
task,
duration,
align='<',
width=max_columns-(duration_width+2), # 2: space and 's'
nalign='>',
nwidth=duration_width,
)
print(output)
if __name__ == "__main__":
args = process_args()
if args.durations_file and os.path.isfile(args.durations_file):
duration_data(args.durations_file, args.graph_cache, args.quantiles_file, args.tasklist)
else:
plain_data(args.tasklist)

View file

@ -0,0 +1,72 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""This script is intended to be called through fzf as a preview formatter."""
from __future__ import absolute_import, print_function
import os
import argparse
from util.estimates import duration_summary, task_duration_data
def process_args():
"""Process preview arguments."""
argparser = argparse.ArgumentParser()
argparser.add_argument('-s', '--show-estimates', action="store_true")
argparser.add_argument('-g', '--graph-cache', type=str, default=None)
argparser.add_argument('-c', '--cache_dir', type=str, default=None)
argparser.add_argument('tasklist', type=str)
return argparser.parse_args()
def plain_display(tasklist):
"""Original preview window display."""
print("\n".join(sorted(s.strip("'") for s in tasklist.split())))
def duration_display(graph_cache_file, tasklist, cache_dir):
"""Preview window display with task durations + metadata."""
tasklist = [t.strip("'") for t in tasklist.split()]
durations = duration_summary(graph_cache_file, tasklist, cache_dir)
output = ""
max_columns = int(os.environ['FZF_PREVIEW_COLUMNS'])
output += "\nSelected tasks take {}\n".format(durations["selected_duration"])
output += "+{} dependencies, total {}\n".format(
durations["dependency_count"],
durations["selected_duration"] + durations["dependency_duration"])
if durations.get("quantile"):
output += "This is in the top {}% of requests\n".format(durations["quantile"])
output += "Estimated finish in {} at {}".format(
durations["wall_duration_seconds"],
durations["eta_datetime"].strftime("%H:%M"))
duration_width = 5 # show five numbers at most.
task_durations = task_duration_data(cache_dir)
output += "{:>{width}}\n".format("Duration", width=max_columns)
for task in tasklist:
duration = int(task_durations.get(task, 0.0))
output += "{:{align}{width}} {:{nalign}{nwidth}}s\n".format(
task,
duration,
align='<',
width=max_columns-(duration_width+2), # 2: space and 's'
nalign='>',
nwidth=duration_width,
)
print(output)
if __name__ == "__main__":
args = process_args()
if args.show_estimates and os.path.isdir(args.cache_dir):
duration_display(args.graph_cache, args.tasklist, args.cache_dir)
else:
plain_display(args.tasklist)

View file

@ -11,6 +11,12 @@ import sys
from mozboot.util import get_state_dir
from mozbuild.base import MozbuildObject
from mozversioncontrol import get_repository_object, MissingVCSExtension
from .util.estimates import (
duration_summary,
download_task_history_data,
make_trimmed_taskgraph_cache
)
GIT_CINNABAR_NOT_FOUND = """
Could not detect `git-cinnabar`.
@ -90,10 +96,53 @@ def generate_try_task_config(method, labels, try_config=None):
return try_task_config
def task_labels_from_try_config(try_task_config):
return try_task_config.get("tasks", list())
def display_push_estimates(try_task_config):
cache_dir = os.path.join(get_state_dir(srcdir=True), 'cache', 'taskgraph')
graph_cache = None
dep_cache = None
target_file = None
for graph_cache_file in ["full_task_graph", "target_task_graph"]:
graph_cache = os.path.join(cache_dir, graph_cache_file)
if os.path.isfile(graph_cache):
dep_cache = graph_cache.replace("task_graph", "task_dependencies")
target_file = graph_cache.replace("task_graph", "task_set")
break
if not dep_cache:
return
download_task_history_data(cache_dir=cache_dir)
make_trimmed_taskgraph_cache(graph_cache, dep_cache, target_file=target_file)
durations = duration_summary(
dep_cache, task_labels_from_try_config(try_task_config), cache_dir)
print("estimates: Runs {} tasks ({} selected, {} dependencies)".format(
durations["dependency_count"] + durations["selected_count"],
durations["selected_count"],
durations["dependency_count"])
)
print("estimates: Total task duration {}".format(
durations["dependency_duration"] + durations["selected_duration"]
))
print("estimates: In the {}% percentile".format(durations["quantile"]))
print("estimates: Should take about {} (Finished around {})".format(
durations["wall_duration_seconds"],
durations["eta_datetime"].strftime("%Y-%m-%d %H:%M"))
)
def push_to_try(method, msg, try_task_config=None,
push=True, closed_tree=False, files_to_change=None):
check_working_directory(push)
display_push_estimates(try_task_config)
# Format the commit message
closed_tree_string = " ON A CLOSED TREE" if closed_tree else ""
commit_message = ('%s%s\n\nPushed via `mach try %s`' %

View file

@ -11,9 +11,6 @@ import subprocess
import sys
from distutils.spawn import find_executable
from distutils.version import StrictVersion
from datetime import datetime, timedelta
import requests
import json
from mozbuild.base import MozbuildObject
from mozboot.util import get_state_dir
@ -22,21 +19,14 @@ from mozterm import Terminal
from ..cli import BaseTryParser
from ..tasks import generate_tasks, filter_tasks_by_paths
from ..push import check_working_directory, push_to_try, generate_try_task_config
from ..util.estimates import download_task_history_data, make_trimmed_taskgraph_cache
terminal = Terminal()
here = os.path.abspath(os.path.dirname(__file__))
build = MozbuildObject.from_environment(cwd=here)
PREVIEW_SCRIPT = os.path.join(build.topsrcdir, 'tools/tryselect/formatters/preview.py')
TASK_DURATION_URL = 'https://storage.googleapis.com/mozilla-mach-data/task_duration_history.json'
GRAPH_QUANTILES_URL = 'https://storage.googleapis.com/mozilla-mach-data/machtry_quantiles.csv'
TASK_DURATION_CACHE = os.path.join(get_state_dir(
srcdir=True), 'cache', 'task_duration_history.json')
GRAPH_QUANTILE_CACHE = os.path.join(get_state_dir(
srcdir=True), 'cache', 'graph_quantile_cache.csv')
TASK_DURATION_TAG_FILE = os.path.join(get_state_dir(
srcdir=True), 'cache', 'task_duration_tag.json')
PREVIEW_SCRIPT = os.path.join(build.topsrcdir, 'tools/tryselect/preview.py')
# Some tasks show up in the target task set, but are either special cases
# or uncommon enough that they should only be selectable with --full.
@ -118,87 +108,6 @@ fzf_header_shortcuts = [
]
def check_downloaded_history():
if not os.path.isfile(TASK_DURATION_TAG_FILE):
return False
try:
with open(TASK_DURATION_TAG_FILE) as f:
duration_tags = json.load(f)
download_date = datetime.strptime(duration_tags.get('download_date'), '%Y-%M-%d')
if download_date < datetime.now() - timedelta(days=30):
return False
except (IOError, ValueError):
return False
if not os.path.isfile(TASK_DURATION_CACHE):
return False
if not os.path.isfile(GRAPH_QUANTILE_CACHE):
return False
return True
def download_task_history_data():
"""Fetch task duration data exported from BigQuery."""
if check_downloaded_history():
return
try:
os.unlink(TASK_DURATION_TAG_FILE)
os.unlink(TASK_DURATION_CACHE)
os.unlink(GRAPH_QUANTILE_CACHE)
except OSError:
print("No existing task history to clean up.")
try:
r = requests.get(TASK_DURATION_URL, stream=True)
except requests.exceptions.RequestException as exc:
# This is fine, the durations just won't be in the preview window.
print("Error fetching task duration cache from {}: {}".format(TASK_DURATION_URL, exc))
return
# The data retrieved from google storage is a newline-separated
# list of json entries, which Python's json module can't parse.
duration_data = list()
for line in r.content.splitlines():
duration_data.append(json.loads(line))
with open(TASK_DURATION_CACHE, 'w') as f:
json.dump(duration_data, f, indent=4)
try:
r = requests.get(GRAPH_QUANTILES_URL, stream=True)
except requests.exceptions.RequestException as exc:
# This is fine, the percentile just won't be in the preview window.
print("Error fetching task group percentiles from {}: {}".format(GRAPH_QUANTILES_URL, exc))
return
with open(GRAPH_QUANTILE_CACHE, 'w') as f:
f.write(r.content)
with open(TASK_DURATION_TAG_FILE, 'w') as f:
json.dump({
'download_date': datetime.now().strftime('%Y-%m-%d')
}, f, indent=4)
def make_trimmed_taskgraph_cache(graph_cache, dep_cache):
"""Trim the taskgraph cache used for dependencies.
Speeds up the fzf preview window to less human-perceptible
ranges."""
if not os.path.isfile(graph_cache):
return
with open(graph_cache) as f:
graph = json.load(f)
graph = {name: list(defn['dependencies'].values()) for name, defn in graph.items()}
with open(dep_cache, 'w') as f:
json.dump(graph, f, indent=4)
class FuzzyParser(BaseTryParser):
name = 'fuzzy'
arguments = [
@ -386,17 +295,20 @@ def run(update=False, query=None, intersect_query=None, try_config=None, full=Fa
tg = generate_tasks(parameters, full)
all_tasks = sorted(tg.tasks.keys())
# graph_Cache created by generate_tasks, recreate the path to that file.
cache_dir = os.path.join(get_state_dir(srcdir=True), 'cache', 'taskgraph')
if full:
graph_cache = os.path.join(cache_dir, 'full_task_graph')
dep_cache = os.path.join(cache_dir, 'full_task_dependencies')
target_set = os.path.join(cache_dir, 'full_task_set')
else:
graph_cache = os.path.join(cache_dir, 'target_task_graph')
dep_cache = os.path.join(cache_dir, 'target_task_dependencies')
target_set = os.path.join(cache_dir, 'target_task_set')
if show_estimates:
download_task_history_data()
make_trimmed_taskgraph_cache(graph_cache, dep_cache)
download_task_history_data(cache_dir=cache_dir)
make_trimmed_taskgraph_cache(graph_cache, dep_cache, target_file=target_set)
if not full:
all_tasks = filter(filter_target_task, all_tasks)
@ -415,10 +327,10 @@ def run(update=False, query=None, intersect_query=None, try_config=None, full=Fa
'--print-query',
]
if show_estimates and os.path.isfile(TASK_DURATION_CACHE):
if show_estimates:
base_cmd.extend([
'--preview', 'python {} -g {} -d {} -q {} "{{+}}"'.format(
PREVIEW_SCRIPT, dep_cache, TASK_DURATION_CACHE, GRAPH_QUANTILE_CACHE),
'--preview', 'python {} -g {} -s -c {} "{{+}}"'.format(
PREVIEW_SCRIPT, dep_cache, cache_dir),
])
else:
base_cmd.extend([

View file

@ -0,0 +1,218 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function
import os
import requests
import json
from datetime import datetime, timedelta
TASK_DURATION_URL = 'https://storage.googleapis.com/mozilla-mach-data/task_duration_history.json'
GRAPH_QUANTILES_URL = 'https://storage.googleapis.com/mozilla-mach-data/machtry_quantiles.csv'
TASK_DURATION_CACHE = 'task_duration_history.json'
GRAPH_QUANTILE_CACHE = 'graph_quantile_cache.csv'
TASK_DURATION_TAG_FILE = 'task_duration_tag.json'
def check_downloaded_history(tag_file, duration_cache, quantile_cache):
if not os.path.isfile(tag_file):
return False
try:
with open(tag_file) as f:
duration_tags = json.load(f)
download_date = datetime.strptime(duration_tags.get('download_date'), '%Y-%M-%d')
if download_date < datetime.now() - timedelta(days=30):
return False
except (IOError, ValueError):
return False
if not os.path.isfile(duration_cache):
return False
if not os.path.isfile(quantile_cache):
return False
return True
def download_task_history_data(cache_dir):
"""Fetch task duration data exported from BigQuery."""
task_duration_cache = os.path.join(cache_dir, TASK_DURATION_CACHE)
task_duration_tag_file = os.path.join(cache_dir, TASK_DURATION_TAG_FILE)
graph_quantile_cache = os.path.join(cache_dir, GRAPH_QUANTILE_CACHE)
if check_downloaded_history(task_duration_tag_file, task_duration_cache, graph_quantile_cache):
return
try:
os.unlink(task_duration_tag_file)
os.unlink(task_duration_cache)
os.unlink(graph_quantile_cache)
except OSError:
print("No existing task history to clean up.")
try:
r = requests.get(TASK_DURATION_URL, stream=True)
except requests.exceptions.RequestException as exc:
# This is fine, the durations just won't be in the preview window.
print("Error fetching task duration cache from {}: {}".format(TASK_DURATION_URL, exc))
return
# The data retrieved from google storage is a newline-separated
# list of json entries, which Python's json module can't parse.
duration_data = list()
for line in r.content.splitlines():
duration_data.append(json.loads(line))
with open(task_duration_cache, 'w') as f:
json.dump(duration_data, f, indent=4)
try:
r = requests.get(GRAPH_QUANTILES_URL, stream=True)
except requests.exceptions.RequestException as exc:
# This is fine, the percentile just won't be in the preview window.
print("Error fetching task group percentiles from {}: {}".format(GRAPH_QUANTILES_URL, exc))
return
with open(graph_quantile_cache, 'w') as f:
f.write(r.content)
with open(task_duration_tag_file, 'w') as f:
json.dump({
'download_date': datetime.now().strftime('%Y-%m-%d')
}, f, indent=4)
def make_trimmed_taskgraph_cache(graph_cache, dep_cache, target_file=None):
"""Trim the taskgraph cache used for dependencies.
Speeds up the fzf preview window to less human-perceptible
ranges."""
if not os.path.isfile(graph_cache):
return
target_task_set = set()
if target_file:
with open(target_file) as f:
target_task_set = set(json.load(f).keys())
with open(graph_cache) as f:
graph = json.load(f)
graph = {
name: list(defn['dependencies'].values())
for name, defn in graph.items()
if name in target_task_set
}
with open(dep_cache, 'w') as f:
json.dump(graph, f, indent=4)
def find_all_dependencies(graph, tasklist):
all_dependencies = dict()
def find_dependencies(task):
dependencies = set()
if task in all_dependencies:
return all_dependencies[task]
if task not in graph:
# Don't add tasks (and so durations) for
# things optimised out.
return dependencies
dependencies.add(task)
for dep in graph.get(task, list()):
all_dependencies[dep] = find_dependencies(dep)
dependencies.update(all_dependencies[dep])
return dependencies
full_deps = set()
for task in tasklist:
full_deps.update(find_dependencies(task))
# Since these have been asked for, they're not inherited dependencies.
return sorted(full_deps - set(tasklist))
def find_longest_path(graph, tasklist, duration_data):
dep_durations = dict()
def find_dependency_durations(task):
if task in dep_durations:
return dep_durations[task]
durations = [find_dependency_durations(dep)
for dep in graph.get(task, list())]
durations.append(0.0)
md = max(durations) + duration_data.get(task, 0.0)
dep_durations[task] = md
return md
longest_paths = [find_dependency_durations(task) for task in tasklist]
return max(longest_paths)
def determine_quantile(quantiles_file, duration):
duration = duration.total_seconds()
with open(quantiles_file) as f:
f.readline() # skip header
boundaries = [float(l.strip()) for l in f.readlines()]
boundaries.sort()
for i, v in enumerate(boundaries):
if duration < v:
break
# In case we weren't given 100 elements
return int(100 * i / len(boundaries))
def task_duration_data(cache_dir):
with open(os.path.join(cache_dir, TASK_DURATION_CACHE)) as f:
durations = json.load(f)
return {d['name']: d['mean_duration_seconds'] for d in durations}
def duration_summary(graph_cache_file, tasklist, cache_dir):
durations = task_duration_data(cache_dir)
graph = dict()
if graph_cache_file:
with open(graph_cache_file) as f:
graph = json.load(f)
dependencies = find_all_dependencies(graph, tasklist)
longest_path = find_longest_path(graph, tasklist, durations)
dependency_duration = 0.0
for task in dependencies:
dependency_duration += int(durations.get(task, 0.0))
total_requested_duration = 0.0
for task in tasklist:
duration = int(durations.get(task, 0.0))
total_requested_duration += duration
output = dict()
total_requested_duration = timedelta(seconds=total_requested_duration)
total_dependency_duration = timedelta(seconds=dependency_duration)
output["selected_duration"] = total_requested_duration
output["dependency_duration"] = total_dependency_duration
output["dependency_count"] = len(dependencies)
output["selected_count"] = len(tasklist)
quantile = None
graph_quantile_cache = os.path.join(cache_dir, GRAPH_QUANTILE_CACHE)
if os.path.isfile(graph_quantile_cache):
quantile = 100 - determine_quantile(graph_quantile_cache,
total_dependency_duration + total_requested_duration)
if quantile:
output['quantile'] = quantile
output["wall_duration_seconds"] = timedelta(seconds=int(longest_path))
output["eta_datetime"] = datetime.now()+timedelta(seconds=longest_path)
# (datetime.now()+timedelta(seconds=longest_path)).strftime("%H:%M")
return output