Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new ta tasks #976

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
https://github.com/codecov/test-results-parser/archive/c840502d1b4dd7d05b2efc2c1328affaf2acd27c.tar.gz#egg=test-results-parser
https://github.com/codecov/test-results-parser/archive/1117e755a36abbecf5a93f3cc3a5e94d918ef895.tar.gz#egg=test-results-parser
https://github.com/codecov/shared/archive/2674ae99811767e63151590906691aed4c5ce1f9.tar.gz#egg=shared
https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring
asgiref>=3.7.2
Expand All @@ -18,6 +18,7 @@ grpcio>=1.66.2
httpx
jinja2>=3.1.3
lxml>=5.3.0
msgpack>=1.1.0
mock
multidict>=6.1.0
openai
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ mock==4.0.3
# via -r requirements.in
monotonic==1.5
# via analytics-python
msgpack==1.1.0
# via -r requirements.in
multidict==6.1.0
# via
# -r requirements.in
Expand Down Expand Up @@ -366,7 +368,7 @@ statsd==3.3.0
# via -r requirements.in
stripe==9.6.0
# via -r requirements.in
test-results-parser @ https://github.com/codecov/test-results-parser/archive/c840502d1b4dd7d05b2efc2c1328affaf2acd27c.tar.gz#egg=test-results-parser
test-results-parser @ https://github.com/codecov/test-results-parser/archive/1117e755a36abbecf5a93f3cc3a5e94d918ef895.tar.gz#egg=test-results-parser
# via -r requirements.in
text-unidecode==1.3
# via faker
Expand Down
2 changes: 2 additions & 0 deletions rollouts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
SHOW_IMPACT_ANALYSIS_DEPRECATION_MSG = Feature(
"show_impact_analysis_deprecation_message"
)

NEW_TA_TASKS = Feature("new_ta_tasks")
3 changes: 2 additions & 1 deletion services/processing/flake_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def process_flake_for_repo_commit(
):
uploads = ReportSession.objects.filter(
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
report__commit__repository__repoid=repo_id,
report__commit__commitid=commit_id,
Comment on lines +25 to 26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe commit_id already uniquely identifies the report, so no need for an additional repository join.

state="processed",
state__in=["processed", "v2_finished"],
).all()

curr_flakes = fetch_curr_flakes(repo_id)
Expand Down
327 changes: 327 additions & 0 deletions services/ta_finishing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
from datetime import date, datetime
from typing import Any, Literal, TypedDict

from msgpack import unpackb
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from database.models import (
DailyTestRollup,
RepositoryFlag,
Test,
TestFlagBridge,
TestInstance,
Upload,
)
from services.redis import get_redis_connection
from services.test_results import generate_flags_hash, generate_test_id


class DailyTotals(TypedDict):
test_id: str
repoid: int
pass_count: int
fail_count: int
skip_count: int
flaky_fail_count: int
branch: str
date: date
latest_run: datetime
commits_where_fail: list[str]
last_duration_seconds: float
avg_duration_seconds: float


def persist_intermediate_results(
db_session: Session,
repoid: int,
commitid: str,
branch: str | None,
uploads: dict[int, Upload],
flaky_test_set: set[str],
) -> None:
tests_to_write: dict[str, dict[str, Any]] = {}
test_instances_to_write: list[dict[str, Any]] = []
daily_totals: dict[str, DailyTotals] = dict()
test_flag_bridge_data: list[dict] = []

for upload in uploads.values():
redis_client = get_redis_connection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this call out of the loop?


intermediate_key = f"ta/intermediate/{repoid}/{commitid}/{upload.id}"
msgpacked_intermediate_result = redis_client.get(intermediate_key)
if msgpacked_intermediate_result is None:
continue

Check warning on line 54 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L54

Added line #L54 was not covered by tests

intermediate_result = unpackb(msgpacked_intermediate_result)

repo_flag_ids = get_repo_flag_ids(db_session, repoid, upload.flag_names)

for parsed_junit in intermediate_result:
framework = parsed_junit["framework"]
for testrun in parsed_junit["testruns"]:
modify_structures(
tests_to_write,
test_instances_to_write,
test_flag_bridge_data,
daily_totals,
testrun,
upload,
repoid,
branch,
commitid,
repo_flag_ids,
flaky_test_set,
framework,
)

if len(tests_to_write) > 0:
save_tests(db_session, tests_to_write)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I commented on this already someplace else?

As you never clear the tests_to_write, having multiple intermediate_results means you are duplicating these insert statements with increasingly more entries.


if len(test_flag_bridge_data) > 0:
save_test_flag_bridges(db_session, test_flag_bridge_data)

Check warning on line 82 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L82

Added line #L82 was not covered by tests

if len(daily_totals) > 0:
save_daily_test_rollups(db_session, daily_totals)

if len(test_instances_to_write) > 0:
save_test_instances(db_session, test_instances_to_write)

upload.state = "v2_persisted"
db_session.commit()

redis_client.delete(intermediate_key)


def get_repo_flag_ids(db_session: Session, repoid: int, flags: list[str]) -> set[int]:
if not flags:
return set()

return set(

Check warning on line 100 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L100

Added line #L100 was not covered by tests
db_session.query(RepositoryFlag.id_)
.filter(
RepositoryFlag.repository_id == repoid,
RepositoryFlag.flag_name.in_(flags),
)
.all()
)


def modify_structures(
tests_to_write: dict[str, dict[str, Any]],
test_instances_to_write: list[dict[str, Any]],
test_flag_bridge_data: list[dict],
daily_totals: dict[str, DailyTotals],
testrun: dict[str, Any],
upload: Upload,
repoid: int,
branch: str | None,
commitid: str,
repo_flag_ids: set[int],
flaky_test_set: set[str],
framework: str,
):
flags_hash = generate_flags_hash(upload.flag_names)
test_id = generate_test_id(
repoid,
testrun["testsuite"],
testrun["name"],
flags_hash,
)

test = generate_test_dict(test_id, repoid, testrun, flags_hash, framework)
tests_to_write[test_id] = test

test_instance = generate_test_instance_dict(
test_id, upload, testrun, commitid, branch, repoid
)
test_instances_to_write.append(test_instance)

if repo_flag_ids:
test_flag_bridge_data.extend(

Check warning on line 141 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L141

Added line #L141 was not covered by tests
{"test_id": test_id, "flag_id": flag_id} for flag_id in repo_flag_ids
)

if test["id"] in daily_totals:
update_daily_totals(

Check warning on line 146 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L146

Added line #L146 was not covered by tests
daily_totals,
test["id"],
testrun["duration"],
testrun["outcome"],
)
else:
create_daily_totals(
daily_totals,
test_id,
repoid,
testrun["duration"],
testrun["outcome"],
branch,
commitid,
flaky_test_set,
)


def generate_test_dict(
test_id: str,
repoid: int,
testrun: dict[str, Any],
flags_hash: str,
framework: str,
) -> dict[str, Any]:
return {
"id": test_id,
"repoid": repoid,
"name": f"{testrun['classname']}\x1f{testrun['name']}",
"testsuite": testrun["testsuite"],
"flags_hash": flags_hash,
"framework": framework,
"filename": testrun["filename"],
"computed_name": testrun["computed_name"],
}


def generate_test_instance_dict(
test_id: str,
upload: Upload,
testrun: dict[str, Any],
commitid: str,
branch: str | None,
repoid: int,
) -> dict[str, Any]:
return {
"test_id": test_id,
"upload_id": upload.id,
"duration_seconds": testrun["duration"],
"outcome": testrun["outcome"],
"failure_message": testrun["failure_message"],
"commitid": commitid,
"branch": branch,
"reduced_error_id": None,
"repoid": repoid,
}


def update_daily_totals(
daily_totals: dict,
test_id: str,
duration_seconds: float,
outcome: Literal["pass", "failure", "error", "skip"],
):
daily_totals[test_id]["last_duration_seconds"] = duration_seconds

Check warning on line 211 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L211

Added line #L211 was not covered by tests

# logic below is a little complicated but we're basically doing:

# (old_avg * num of values used to compute old avg) + new value
# -------------------------------------------------------------
# num of values used to compute old avg + 1
daily_totals[test_id]["avg_duration_seconds"] = (

Check warning on line 218 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L218

Added line #L218 was not covered by tests
daily_totals[test_id]["avg_duration_seconds"]
* (daily_totals[test_id]["pass_count"] + daily_totals[test_id]["fail_count"])
+ duration_seconds
) / (daily_totals[test_id]["pass_count"] + daily_totals[test_id]["fail_count"] + 1)

if outcome == "pass":
daily_totals[test_id]["pass_count"] += 1
elif outcome == "failure" or outcome == "error":
daily_totals[test_id]["fail_count"] += 1
elif outcome == "skip":
daily_totals[test_id]["skip_count"] += 1

Check warning on line 229 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L224-L229

Added lines #L224 - L229 were not covered by tests


def create_daily_totals(
daily_totals: dict,
test_id: str,
repoid: int,
duration_seconds: float,
outcome: Literal["pass", "failure", "error", "skip"],
branch: str | None,
commitid: str,
flaky_test_set: set[str],
):
daily_totals[test_id] = {
"test_id": test_id,
"repoid": repoid,
"last_duration_seconds": duration_seconds,
"avg_duration_seconds": duration_seconds,
"pass_count": 1 if outcome == "pass" else 0,
"fail_count": 1 if outcome == "failure" or outcome == "error" else 0,
"skip_count": 1 if outcome == "skip" else 0,
"flaky_fail_count": 1
if test_id in flaky_test_set and (outcome == "failure" or outcome == "error")
else 0,
"branch": branch,
"date": date.today(),
"latest_run": datetime.now(),
"commits_where_fail": [commitid]
if (outcome == "failure" or outcome == "error")
else [],
}


def save_tests(db_session: Session, tests_to_write: dict[str, dict[str, Any]]):
test_data = sorted(
tests_to_write.values(),
key=lambda x: str(x["id"]),
)

test_insert = insert(Test.__table__).values(test_data)
insert_on_conflict_do_update = test_insert.on_conflict_do_update(
index_elements=["repoid", "name", "testsuite", "flags_hash"],
set_={
"framework": test_insert.excluded.framework,
"computed_name": test_insert.excluded.computed_name,
"filename": test_insert.excluded.filename,
},
)
db_session.execute(insert_on_conflict_do_update)
db_session.commit()


def save_test_flag_bridges(db_session: Session, test_flag_bridge_data: list[dict]):
insert_on_conflict_do_nothing_flags = (

Check warning on line 282 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L282

Added line #L282 was not covered by tests
insert(TestFlagBridge.__table__)
.values(test_flag_bridge_data)
.on_conflict_do_nothing(index_elements=["test_id", "flag_id"])
)
db_session.execute(insert_on_conflict_do_nothing_flags)
db_session.commit()

Check warning on line 288 in services/ta_finishing.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/ta_finishing.py#L287-L288

Added lines #L287 - L288 were not covered by tests


def save_daily_test_rollups(db_session: Session, daily_rollups: dict[str, DailyTotals]):
sorted_rollups = sorted(daily_rollups.values(), key=lambda x: str(x["test_id"]))
rollup_table = DailyTestRollup.__table__
stmt = insert(rollup_table).values(sorted_rollups)
stmt = stmt.on_conflict_do_update(
index_elements=[
"repoid",
"branch",
"test_id",
"date",
],
set_={
"last_duration_seconds": stmt.excluded.last_duration_seconds,
"avg_duration_seconds": (
rollup_table.c.avg_duration_seconds
* (rollup_table.c.pass_count + rollup_table.c.fail_count)
+ stmt.excluded.avg_duration_seconds
)
/ (rollup_table.c.pass_count + rollup_table.c.fail_count + 1),
"latest_run": stmt.excluded.latest_run,
"pass_count": rollup_table.c.pass_count + stmt.excluded.pass_count,
"skip_count": rollup_table.c.skip_count + stmt.excluded.skip_count,
"fail_count": rollup_table.c.fail_count + stmt.excluded.fail_count,
"flaky_fail_count": rollup_table.c.flaky_fail_count
+ stmt.excluded.flaky_fail_count,
"commits_where_fail": rollup_table.c.commits_where_fail
+ stmt.excluded.commits_where_fail,
},
)
db_session.execute(stmt)
db_session.commit()


def save_test_instances(db_session: Session, test_instance_data: list[dict]):
insert_test_instances = insert(TestInstance.__table__).values(test_instance_data)
db_session.execute(insert_test_instances)
db_session.commit()
2 changes: 2 additions & 0 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from tasks.sync_repo_languages_gql import sync_repo_languages_gql_task
from tasks.sync_repos import sync_repos_task
from tasks.sync_teams import sync_teams_task
from tasks.ta_finisher import ta_finisher_task
from tasks.ta_processor import ta_processor_task
from tasks.test_results_finisher import test_results_finisher_task
from tasks.test_results_processor import test_results_processor_task
from tasks.timeseries_backfill import (
Expand Down
Loading
Loading