From be41f0483cb2e9b7312487efc517ce50957d00b4 Mon Sep 17 00:00:00 2001 From: Tyler Burton Date: Tue, 12 Nov 2024 12:42:48 -0600 Subject: [PATCH] cleans up routes and makes facets more flexible --- app/routes.py | 72 ++-- database/interface.py | 68 +-- docker-compose.yml | 1 + harvester/harvest.py | 401 ++++++++++-------- harvester/lib/load_manager.py | 5 +- harvester/utils/ckan_utils.py | 19 +- harvester/utils/general_utils.py | 18 + scripts/load-test.py | 2 +- tests/conftest.py | 5 +- tests/fixtures.json | 354 ++++++++-------- tests/integration/app/test_load_manager.py | 24 +- tests/integration/app/test_login_required.py | 177 ++++---- tests/integration/app/test_routes.py | 29 ++ tests/integration/database/test_db.py | 33 +- tests/integration/harvest/test_ckan_load.py | 68 +-- tests/integration/harvest/test_compare.py | 29 +- .../harvest/test_exception_handling.py | 103 ++--- .../harvest/test_harvest_full_flow.py | 168 +++++++- tests/integration/harvest/test_transform.py | 8 +- tests/unit/test_utils.py | 23 +- 20 files changed, 949 insertions(+), 658 deletions(-) create mode 100644 tests/integration/app/test_routes.py diff --git a/app/routes.py b/app/routes.py index d63ac6a0..46ff7e10 100644 --- a/app/routes.py +++ b/app/routes.py @@ -337,7 +337,6 @@ def add_organization(): ) -@mod.route("/organization/", methods=["GET"]) @mod.route("/organizations/", methods=["GET"]) def view_organizations(): organizations = db.get_all_organizations() @@ -472,16 +471,16 @@ def view_harvest_source_data(source_id: str): count=True, skip_pagination=True, source_id=source.id, - facets=["ckan_id != null"], + facets="ckan_id is not null", ) error_records_count = db.get_harvest_records_by_source( count=True, skip_pagination=True, source_id=source.id, - facets=["status = 'error'"], + facets="status = 'error'", ) - # TODO: wire in paginated jobs query - jobs = db.get_all_harvest_jobs_by_filter({"harvest_source_id": source.id}) + # TODO: wire in paginated jobs htmx refresh ui & route + jobs = db.pget_harvest_jobs(facets=f"harvest_source_id = '{source.id}'") next_job = "N/A" future_jobs = db.get_new_harvest_jobs_by_source_in_future(source.id) if len(future_jobs): @@ -528,7 +527,6 @@ def view_harvest_source_data(source_id: str): return render_template("view_source_data.html", data=data) -@mod.route("/harvest_source/", methods=["GET"]) @mod.route("/harvest_sources/", methods=["GET"]) def view_harvest_sources(): sources = db.get_all_harvest_sources() @@ -745,18 +743,45 @@ def get_harvest_record(record_id): def get_harvest_records(): job_id = request.args.get("harvest_job_id") source_id = request.args.get("harvest_source_id") - page = request.args.get("page") + paginate = request.args.get("paginate", type=bool) + skip_pagination = request.args.get("skip_pagination", type=bool) + count = request.args.get("count", type=bool) + page = request.args.get("page", type=int) + facets = request.args.get("facets", "") if job_id: - records = db.get_harvest_records_by_job(job_id, page) - if not records: - return "No harvest records found for this harvest job", 404 + records = db.get_harvest_records_by_job( + job_id, + page=page, + paginate=paginate, + skip_pagination=skip_pagination, + facets=facets, + ) + elif source_id: - records = db.get_harvest_records_by_source(source_id, page) + records = db.get_harvest_records_by_source( + source_id, + page=page, + paginate=paginate, + skip_pagination=skip_pagination, + count=count, + facets=facets, + ) if not records: return "No harvest records found for this harvest source", 404 else: - records = db.pget_harvest_records(page) - return db._to_dict(records) + records = db.pget_harvest_records( + page=page, + paginate=paginate, + skip_pagination=skip_pagination, + facets=facets, + ) + + if not records: + return "No harvest records found for this query", 404 + elif isinstance(records, int): + return f"{records} records found", 200 + else: + return db._to_dict(records) @mod.route("/harvest_record//raw", methods=["GET"]) @@ -818,27 +843,6 @@ def get_data_sources(): return render_template("get_data_sources.html", sources=source, organizations=org) -## Test interface, will remove later -@mod.route("/delete_all_records", methods=["DELETE"]) -def delete_all_records(): - db.delete_all_harvest_records() - return "All harvest records deleted" - - -## Test interface, will remove later -@mod.route("/add_harvest_job_error", methods=["POST"]) -def add_harvest_job_error(): - db.add_harvest_job_error(request.json) - return "Added harvest job error" - - -## Test interface, will remove later -@mod.route("/add_harvest_record_error", methods=["POST"]) -def add_harvest_record_error(): - err = db.add_harvest_record_error(request.json) - return db._to_dict(err) - - def register_routes(app): app.register_blueprint(mod) app.register_blueprint(user) diff --git a/database/interface.py b/database/interface.py index b552cd3a..224a5ec7 100644 --- a/database/interface.py +++ b/database/interface.py @@ -7,10 +7,12 @@ import ckanapi from ckanapi import RemoteCKAN -from sqlalchemy import create_engine, func, inspect, or_, select, text +from sqlalchemy import create_engine, func, inspect, select, text from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import scoped_session, sessionmaker +from harvester.utils.general_utils import query_filter_builder + from .models import ( HarvestJob, HarvestJobError, @@ -224,8 +226,9 @@ def _clear_harvest_records(): ckan_ids = [record.ckan_id for record in records if record.ckan_id is not None] error_records = [record for record in records if record.status == "error"] - jobs_in_progress = self.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source.id, "status": "in_progress"} + jobs_in_progress = self.pget_harvest_jobs( + facets=f"harvest_source_id = '{source.id}', 'status' = 'in_progress'", + paginate=False, ) # Ensure no jobs are in progress @@ -325,10 +328,6 @@ def add_harvest_job(self, job_data): def get_harvest_job(self, job_id): return self.db.query(HarvestJob).filter_by(id=job_id).first() - def get_all_harvest_jobs_by_filter(self, filter): - harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all() - return [job for job in harvest_jobs or []] - def get_first_harvest_jobs_by_filter(self, filter): harvest_job = ( self.db.query(HarvestJob) @@ -361,11 +360,6 @@ def get_new_harvest_jobs_by_source_in_future(self, source_id): ) return [job for job in harvest_jobs or []] - def get_harvest_jobs_by_faceted_filter(self, attr, values): - query_list = [getattr(HarvestJob, attr) == value for value in values] - harvest_jobs = self.db.query(HarvestJob).filter(or_(*query_list)).all() - return [job for job in harvest_jobs] - def update_harvest_job(self, job_id, updates): try: job = self.db.get(HarvestJob, job_id) @@ -463,6 +457,7 @@ def add_harvest_record(self, record_data): self.db.rollback() return None + # TODO: should we delete this if it's not used in code. def add_harvest_records(self, records_data: list) -> dict: """ Add many records at once @@ -524,6 +519,22 @@ def get_latest_harvest_records_by_source(self, source_id): return [dict(zip(fields, record)) for record in records] + def get_all_latest_harvest_records_by_source(self, source_id): + # datetimes are returned as datetime objs not strs + sql = text( + f"""SELECT DISTINCT ON (identifier) * + FROM harvest_record + WHERE harvest_source_id = '{source_id}' + ORDER BY identifier, date_created DESC""" + ) + + res = self.db.execute(sql) + + fields = list(res.keys()) + records = res.fetchall() + + return [dict(zip(fields, record)) for record in records] + def close(self): if hasattr(self.db, "remove"): self.db.remove() @@ -602,29 +613,32 @@ def verify_user(self, usr_data): #### PAGINATED QUERIES #### @count @paginate - def pget_harvest_jobs(self, filter=text(""), **kwargs): - return self.db.query(HarvestJob).filter(filter) + def pget_harvest_jobs(self, facets="", **kwargs): + facet_string = query_filter_builder(None, facets) + return self.db.query(HarvestJob).filter(text(facet_string)) @count @paginate - def pget_harvest_records(self, filter=text(""), **kwargs): - return self.db.query(HarvestRecord).filter(filter) + def pget_harvest_records(self, facets="", **kwargs): + return self.db.query(HarvestRecord).filter(text(facets)) @count @paginate - def pget_harvest_job_errors(self, filter=text(""), **kwargs): - return self.db.query(HarvestJobError).filter(filter) + def pget_harvest_job_errors(self, facets="", **kwargs): + return self.db.query(HarvestJobError).filter(text(facets)) @count @paginate - def pget_harvest_record_errors(self, filter=text(""), **kwargs): - return self.db.query(HarvestRecordError).filter(filter) + def pget_harvest_record_errors(self, facets="", **kwargs): + return self.db.query(HarvestRecordError).filter(text(facets)) - #### FACETED BUILDER QUERIES #### - def get_harvest_records_by_job(self, job_id, facets=[], **kwargs): - filter_string = " AND ".join([f"harvest_job_id = '{job_id}'"] + facets) - return self.pget_harvest_records(filter=text(filter_string), **kwargs) + #### FILTERED BUILDER QUERIES #### + def get_harvest_records_by_job(self, job_id, facets="", **kwargs): + facet_string = query_filter_builder(f"harvest_job_id = '{job_id}'", facets) + return self.pget_harvest_records(facets=facet_string, **kwargs) - def get_harvest_records_by_source(self, source_id, facets=[], **kwargs): - filter_string = " AND ".join([f"harvest_source_id = '{source_id}'"] + facets) - return self.pget_harvest_records(filter=text(filter_string), **kwargs) + def get_harvest_records_by_source(self, source_id, facets="", **kwargs): + facet_string = query_filter_builder( + f"harvest_source_id = '{source_id}'", facets + ) + return self.pget_harvest_records(facets=facet_string, **kwargs) diff --git a/docker-compose.yml b/docker-compose.yml index 93930b9a..84e91c12 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,7 @@ services: retries: 5 transformer: image: ghcr.io/gsa/mdtranslator:latest + restart: always env_file: - .env environment: diff --git a/harvester/harvest.py b/harvester/harvest.py index a0c6a669..bcc3d244 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -1,5 +1,3 @@ -# ruff: noqa: F841 -# ruff: noqa: E402 import functools import json import logging @@ -8,13 +6,13 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path - +from typing import List import requests from boltons.setutils import IndexedSet from ckanapi import RemoteCKAN from jsonschema import Draft202012Validator -sys.path.insert(1, "/".join(os.path.realpath(__file__).split("/")[0:-2])) +from itertools import groupby from harvester import SMTP_CONFIG import smtplib @@ -41,6 +39,8 @@ traverse_waf, ) +sys.path.insert(1, "/".join(os.path.realpath(__file__).split("/")[0:-2])) + # requests data session = requests.Session() # TODD: make sure this timeout config doesn't change all requests! @@ -73,7 +73,7 @@ class HarvestSource: "schema_type", "source_type", "id", # db guuid - "notification_emails" + "notification_emails", ], repr=False, ) @@ -90,7 +90,12 @@ class HarvestSource: # worth it? not sure... # since python 3.7 dicts are insertion ordered so deletions will occur first compare_data: dict = field( - default_factory=lambda: {"delete": set(), "create": set(), "update": set()}, + default_factory=lambda: { + "delete": set(), + "create": set(), + "update": set(), + None: set(), + }, repr=False, ) external_records: dict = field(default_factory=lambda: {}, repr=False) @@ -98,6 +103,7 @@ class HarvestSource: def __post_init__(self) -> None: self._db_interface: HarvesterDBInterface = db_interface + self._validator = Draft202012Validator(self.dataset_schema) self.get_source_info_from_job_id(self.job_id) @property @@ -109,7 +115,19 @@ def db_interface(self) -> HarvesterDBInterface: return self._db_interface @property - def source_attrs(self) -> list: + def validator(self): + return self._validator + + @property + def records(self): + return self._records + + @property + def report(self): + return self._report + + @property + def source_attrs(self) -> List: return self._source_attrs @property @@ -138,7 +156,7 @@ def get_source_info_from_job_id(self, job_id: str) -> None: self.job_id, ) - def internal_records_to_id_hash(self, records: list[dict]) -> None: + def internal_records_to_id_hash(self, records: List[dict]) -> None: for record in records: self.internal_records[record["identifier"]] = Record( self, @@ -150,7 +168,6 @@ def internal_records_to_id_hash(self, records: list[dict]) -> None: ) def get_record_identifier(self, record: dict) -> str: - record_id = "identifier" if self.schema_type == "dcatus1.1" else "url" if record_id not in record: @@ -163,13 +180,12 @@ def get_record_identifier(self, record: dict) -> str: return record_id - def external_records_to_id_hash(self, records: list[dict]) -> None: + def external_records_to_id_hash(self, records: List[dict]) -> None: # ruff: noqa: F841 logger.info("converting harvest records to id: hash") for record in records: try: - identifier = self.get_record_identifier(record) if self.source_type == "document": @@ -188,17 +204,6 @@ def external_records_to_id_hash(self, records: list[dict]) -> None: self.job_id, ) - def prepare_internal_data(self) -> None: - logger.info("retrieving and preparing internal records.") - try: - records = self.db_interface.get_latest_harvest_records_by_source(self.id) - self.internal_records_to_id_hash(records) - except Exception as e: - raise ExtractInternalException( - f"{self.name} {self.url} failed to extract internal records. exiting", - self.job_id, - ) - def prepare_external_data(self) -> None: logger.info("retrieving and preparing external records.") try: @@ -215,6 +220,17 @@ def prepare_external_data(self) -> None: self.job_id, ) + def prepare_internal_data(self) -> None: + logger.info("retrieving and preparing internal records.") + try: + records = self.db_interface.get_latest_harvest_records_by_source(self.id) + self.internal_records_to_id_hash(records) + except Exception as e: + raise ExtractInternalException( + f"{self.name} {self.url} failed to extract internal records. exiting", + self.job_id, + ) + def compare(self) -> None: """Compares records""" # ruff: noqa: F841 @@ -233,6 +249,8 @@ def compare(self) -> None: internal_hash = self.internal_records[i].metadata_hash if external_hash != internal_hash: self.compare_data["update"].add(i) + else: + self.compare_data[None].add(i) except Exception as e: # TODO: do something with 'e' raise CompareException( @@ -240,20 +258,16 @@ def compare(self) -> None: self.job_id, ) - def get_record_changes(self) -> None: - """determine which records needs to be updated, deleted, or created""" - logger.info(f"getting records changes for {self.name} using {self.url}") - self.prepare_external_data() - self.prepare_internal_data() - self.compare() - def write_compare_to_db(self) -> dict: records = [] - for action, ids in self.compare_data.items(): for record_id in ids: if action == "delete": record = self.internal_records[record_id] + elif action == "update": + record = self.external_records[record_id] + record.ckan_id = self.internal_records[record_id].ckan_id + record.ckan_name = self.internal_records[record_id].ckan_name else: record = self.external_records[record_id] @@ -262,132 +276,159 @@ def write_compare_to_db(self) -> dict: else: source_raw = record.metadata["content"] - records.append( - { - "identifier": record.identifier, - "harvest_job_id": record.harvest_source.job_id, - "harvest_source_id": record.harvest_source.id, - "source_hash": record.metadata_hash, - "source_raw": source_raw, - "action": action, - "ckan_id": record.ckan_id, - "ckan_name": record.ckan_name, - } - ) - self.internal_records_lookup_table = self.db_interface.add_harvest_records( - records - ) + # set record action + record.action = action + db_record = { + "identifier": record.identifier, + "harvest_job_id": record.harvest_source.job_id, + "harvest_source_id": record.harvest_source.id, + "source_hash": record.metadata_hash, + "source_raw": source_raw, + "action": action, + "ckan_id": record.ckan_id, + "ckan_name": record.ckan_name, + } + if action is not None: + db_record = self.db_interface.add_harvest_record(db_record) + record.id = db_record.id + records.append(record) + + # set records on new + self._records = records + + def extract_cleanup(self): + self.compare_data = {} + self.internal_records = {} + self.external_records = {} + + def extract(self) -> None: + """determine which records needs to be updated, deleted, or created""" + logger.info(f"getting records changes for {self.name} using {self.url}") + + self.prepare_external_data() + self.prepare_internal_data() + self.compare() + self.write_compare_to_db() + self.extract_cleanup() + + def transform(self) -> None: + logger.info("transforming records") + for record in self.records: + pass + + def validate(self) -> None: + logger.info("validating records") + for record in self.records: + try: + record.validate() + except ValidationException as e: + pass - def synchronize_records(self) -> None: + def load(self) -> None: """runs the delete, update, and create - self.compare can be empty because there was no harvest source response or there's truly nothing to process """ logger.info("synchronizing records") - for action, ids in self.compare_data.items(): - for i in ids: - try: - if action == "delete": - # we don't actually create a Record instance for deletions - # so creating it here as a sort of acknowledgement - self.external_records[i] = Record( - self, - self.internal_records[i].identifier, - _ckan_id=self.internal_records[i].ckan_id, - ) - self.external_records[i].action = action - try: - self.external_records[i].delete_record() - self.external_records[i].update_self_in_db() - except Exception as e: - self.external_records[i].status = "error" - raise SynchronizeException( - f"failed to {self.external_records[i].action} \ - for {self.external_records[i].identifier} :: \ - {repr(e)}", - self.job_id, - self.internal_records_lookup_table[ - self.external_records[i].identifier - ], - ) - continue - - record = self.external_records[i] - if action == "update": - record.ckan_id = self.internal_records[i].ckan_id - record.ckan_name = self.internal_records[i].ckan_name - - # no longer setting action in compare so setting it here... - record.action = action - - if self.schema_type != "dcatus1.1": - record.transform() - record.validate() - record.sync() - - except ( - ValidationException, - DCATUSToCKANException, - SynchronizeException, - TransformationException, - ) as e: - pass - - def report(self) -> None: + for record in self.records: + try: + record.sync() + + except ( + DCATUSToCKANException, + SynchronizeException, + TransformationException, + ) as e: + pass + + def do_report(self) -> None: logger.info("report results") - # log our original compare data - logger.info("expected actions to be done") - logger.info({action: len(ids) for action, ids in self.compare_data.items()}) - - # validation count and actual results - actual_results_action = { - "delete": 0, - "update": 0, - "create": 0, - None: 0, + results = { + "action": {"create": 0, "update": 0, "delete": 0, None: 0}, + "status": {"success": 0, "error": 0, None: 0}, + "validity": {True: 0, False: 0}, } - actual_results_status = {"success": 0, "error": 0, None: 0} - validity = {"valid": 0, "invalid": 0, "ignored": 0} - - for record_id, record in self.external_records.items(): - # action - if record.status != "error": - actual_results_action[record.action] += 1 - # status - actual_results_status[record.status] += 1 - # validity - if record.valid: - validity["valid"] += 1 - elif not record.valid: - validity["invalid"] += 1 - else: - validity["not_validated"] += 1 + for key, group in groupby( + self.records, lambda x: x.action if x.status != "error" else False + ): + results["action"][key] = sum(1 for _ in group) + + for key, group in groupby(self.records, lambda x: x.status): + results["status"][key] = sum(1 for _ in group) - # what actually happened? - logger.info("actual actions completed") - logger.info(actual_results_action) + for key, group in groupby(self.records, lambda x: x.valid): + results["validity"][key] = sum(1 for _ in group) - # what actually happened? - logger.info("actual status completed") - logger.info(actual_results_status) + logger.info("actions completed") + logger.info(results["action"]) - # what's our record validity count? - logger.info("validity of the records") - logger.info(validity) + logger.info("status completed") + logger.info(results["status"]) + + logger.info("validity of records") + logger.info(results["validity"]) job_status = { "status": "complete", "date_finished": datetime.now(timezone.utc), - "records_added": actual_results_action["create"], - "records_updated": actual_results_action["update"], - "records_deleted": actual_results_action["delete"], - "records_ignored": actual_results_action[None], - "records_errored": actual_results_status["error"], + "records_added": results["action"]["create"], + "records_updated": results["action"]["update"], + "records_deleted": results["action"]["delete"], + "records_ignored": results["action"][None], + "records_errored": results["status"]["error"], } self.db_interface.update_harvest_job(self.job_id, job_status) + self._report = job_status if hasattr(self, "notification_emails") and self.notification_emails: - self.send_notification_emails(actual_results_action) + self.send_notification_emails(results) + + def restart_job(self): + logger.info(f"restarting failed job for {self.name}") + job = self.db_interface.get_harvest_job(self.job_id) + updated_job = self.db_interface.update_harvest_job( + job.id, {"status": "in_progress"} + ) + print(f"Updated job {updated_job.id} to in_progress") + db_records = [] + for db_record in job.records: + new_record = Record( + self, + db_record.identifier, + json.loads(db_record.source_raw), + db_record.source_hash, + db_record.action, + _status=db_record.status, + _ckan_id=db_record.ckan_id, + _ckan_name=db_record.ckan_name, + _id=db_record.id, + ) + db_records.append(new_record) + self._records = db_records + + def follow_up_job(self): + logger.info(f"kicking off pickup job for {self.name}") + db_records = self.db_interface.get_all_latest_harvest_records_by_source(self.id) + job = self.db_interface.get_harvest_job(self.job_id) + updated_job = self.db_interface.update_harvest_job( + job.id, {"status": "in_progress"} + ) + print(f"Updated job {updated_job.id} to in_progress") + new_records = [] + for db_record in db_records: + new_record = Record( + self, + db_record["identifier"], + json.loads(db_record["source_raw"]), + db_record["source_hash"], + db_record["action"], + _status=db_record["status"], + _ckan_id=db_record["ckan_id"], + _ckan_name=db_record["ckan_name"], + _id=db_record["id"], + ) + new_records.append(new_record) + self._records = new_records def send_notification_emails(self, results: dict) -> None: try: @@ -398,10 +439,11 @@ def send_notification_emails(self, results: dict) -> None: f"The harvest job ({self.job_id}) has been successfully completed.\n" f"You can view the details here: {job_url}\n\n" "Summary of the job:\n" - f"- Records Added: {results['create']}\n" - f"- Records Updated: {results['update']}\n" - f"- Records Deleted: {results['delete']}\n" - f"- Records Ignored: {results[None]}\n\n" + f"- Records Added: {results['action']['create']}\n" + f"- Records Updated: {results['action']['update']}\n" + f"- Records Deleted: {results['action']['delete']}\n" + f"- Records Ignored: {results['action'][None]}\n" + f"- Records Errored: {results['status']['error']}\n\n" "====\n" "You received this email because you subscribed to harvester updates.\n" "Please do not reply to this email, as it is not monitored." @@ -423,8 +465,9 @@ def send_notification_emails(self, results: dict) -> None: msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) - server.sendmail(SMTP_CONFIG["default_sender"], [recipient], - msg.as_string()) + server.sendmail( + SMTP_CONFIG["default_sender"], [recipient], msg.as_string() + ) logger.info(f"Notification email sent to: {recipient}") except Exception as e: @@ -447,6 +490,7 @@ class Record: _ckan_name: str = None _mdt_writer: str = "dcat_us" _mdt_msgs: str = "" + _id: str = None transformed_data: dict = None ckanified_metadata: dict = field(default_factory=lambda: {}) @@ -479,6 +523,14 @@ def ckan_id(self) -> str: def ckan_id(self, value) -> None: self._ckan_id = value + @property + def id(self) -> str: + return self._id + + @id.setter + def id(self, value) -> None: + self._id = value + @property def ckan_name(self) -> str: return self._ckan_name @@ -540,7 +592,7 @@ def validation_msg(self) -> str: @validation_msg.setter def validation_msg(self, value) -> None: if not isinstance(value, str): - raise ValueError("status must be a string") + raise ValueError("validation_msg must be a string") self._validation_msg = value @property @@ -554,7 +606,6 @@ def status(self, value) -> None: self._status = value def transform(self) -> None: - data = { "file": self.metadata["content"], "reader": self.reader_map[self.harvest_source.schema_type], @@ -570,7 +621,7 @@ def transform(self) -> None: raise TransformationException( f"record failed to transform: {self.mdt_msgs}", self.harvest_source.job_id, - self.harvest_source.internal_records_lookup_table[self.identifier], + self.id, ) if 200 <= resp.status_code < 300: @@ -578,24 +629,26 @@ def transform(self) -> None: def validate(self) -> None: logger.info(f"validating {self.identifier}") - # ruff: noqa: F841 - validator = Draft202012Validator(self.harvest_source.dataset_schema) try: + if self.action == "delete": + return + record = ( self.metadata if self.transformed_data is None else self.transformed_data ) - validator.validate(record) + + self.harvest_source.validator.validate(record) self.valid = True except Exception as e: self.status = "error" - self.validation_msg = str(e) # TODO: verify this is what we want + self.validation_msg = str(e.message) self.valid = False raise ValidationException( repr(e), self.harvest_source.job_id, - self.harvest_source.internal_records_lookup_table[self.identifier], + self.id, ) def create_record(self, retry=False): @@ -635,7 +688,7 @@ def update_self_in_db(self) -> bool: data["ckan_name"] = self.ckan_name self.harvest_source.db_interface.update_harvest_record( - self.harvest_source.internal_records_lookup_table[self.identifier], + self.id, data, ) @@ -643,37 +696,44 @@ def ckanify_dcatus(self) -> None: from harvester.utils.ckan_utils import ckanify_dcatus try: - self.ckanified_metadata = ckanify_dcatus(self.metadata, self.harvest_source) + self.ckanified_metadata = ckanify_dcatus( + self.metadata, self.harvest_source, self.id + ) except Exception as e: self.status = "error" - raise DCATUSToCKANException( - repr(e), - self.harvest_source.job_id, - self.harvest_source.internal_records_lookup_table[self.identifier], - ) + raise DCATUSToCKANException(repr(e), self.harvest_source.job_id, self.id) def sync(self) -> None: if self.valid is False: logger.warning(f"{self.identifier} is invalid. bypassing {self.action}") return - self.ckanify_dcatus() + if self.status == "success": + logger.info( + f"{self.identifier} has status 'success'. bypassing {self.action}" + ) + return start = datetime.now(timezone.utc) - + # todo: try: + if self.action == "delete": + self.delete_record() if self.action == "create": + self.ckanify_dcatus() self.create_record() if self.action == "update": + self.ckanify_dcatus() self.update_record() except Exception as e: self.status = "error" raise SynchronizeException( f"failed to {self.action} for {self.identifier} :: {repr(e)}", self.harvest_source.job_id, - self.harvest_source.internal_records_lookup_table[self.identifier], + self.id, ) - self.update_self_in_db() + if self.action is not None: + self.update_self_in_db() logger.info( f"time to {self.action} {self.identifier} \ @@ -684,10 +744,19 @@ def sync(self) -> None: def harvest(jobId): logger.info(f"Harvest job starting for JobId: {jobId}") harvest_source = HarvestSource(jobId) - harvest_source.get_record_changes() - harvest_source.write_compare_to_db() - harvest_source.synchronize_records() - harvest_source.report() + + # extract, compare, and save the results + harvest_source.extract() + + # transform and validate the transform + harvest_source.transform() + harvest_source.validate() + + # sync with CKAN + harvest_source.load() + + # generate harvest job report + harvest_source.do_report() if __name__ == "__main__": diff --git a/harvester/lib/load_manager.py b/harvester/lib/load_manager.py index 3d53c8c1..dbbd0665 100644 --- a/harvester/lib/load_manager.py +++ b/harvester/lib/load_manager.py @@ -129,8 +129,9 @@ def trigger_manual_job(self, source_id): """manual trigger harvest job, takes a source_id""" source = interface.get_harvest_source(source_id) - jobs_in_progress = interface.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source.id, "status": "in_progress"} + jobs_in_progress = interface.pget_harvest_jobs( + facets=f"harvest_source_id = '{source.id}', status = 'in_progress'", + paginate=False, ) if len(jobs_in_progress): return f"Can't trigger harvest. Job {jobs_in_progress[0].id} already in progress." # noqa E501 diff --git a/harvester/utils/ckan_utils.py b/harvester/utils/ckan_utils.py index 4fd11cff..74d08188 100644 --- a/harvester/utils/ckan_utils.py +++ b/harvester/utils/ckan_utils.py @@ -153,7 +153,9 @@ def munge_tag(tag: str) -> str: return tag -def create_ckan_extras(metadata: dict, harvest_source: HarvestSource) -> list[dict]: +def create_ckan_extras( + metadata: dict, harvest_source: HarvestSource, record_id: str +) -> list[dict]: extras = [ "accessLevel", "bureauCode", @@ -165,12 +167,7 @@ def create_ckan_extras(metadata: dict, harvest_source: HarvestSource) -> list[di output = [ {"key": "resource-type", "value": "Dataset"}, - { - "key": "harvest_object_id", - "value": harvest_source.internal_records_lookup_table[ - metadata["identifier"] - ], - }, + {"key": "harvest_object_id", "value": record_id}, { "key": "source_datajson_identifier", # dataset is datajson format or not "value": True, @@ -305,14 +302,18 @@ def simple_transform(metadata: dict, owner_org: str) -> dict: return output -def ckanify_dcatus(metadata: dict, harvest_source: HarvestSource) -> dict: +def ckanify_dcatus( + metadata: dict, harvest_source: HarvestSource, record_id: str +) -> dict: ckanified_metadata = simple_transform(metadata, harvest_source.organization_id) ckanified_metadata["resources"] = create_ckan_resources(metadata) ckanified_metadata["tags"] = ( create_ckan_tags(metadata["keyword"]) if "keyword" in metadata else [] ) - ckanified_metadata["extras"] = create_ckan_extras(metadata, harvest_source) + ckanified_metadata["extras"] = create_ckan_extras( + metadata, harvest_source, record_id + ) return ckanified_metadata diff --git a/harvester/utils/general_utils.py b/harvester/utils/general_utils.py index 6dfd99af..cde1b9b9 100644 --- a/harvester/utils/general_utils.py +++ b/harvester/utils/general_utils.py @@ -123,3 +123,21 @@ def download_waf(files): output.append({"url": file, "content": download_file(file, ".xml")}) return output + + +def query_filter_builder(base, facets): + """Builds filter strings from base and comma separated string of filters + :param base str - base filter query + :param facets str - extra facets + + """ + if base is None: + facet_string = facets.split(",")[0] + facet_list = facets.split(",")[1:] + else: + facet_string = base + facet_list = facets.split(",") + for facet in facet_list: + if facet != "": + facet_string += f" AND {facet}" + return facet_string diff --git a/scripts/load-test.py b/scripts/load-test.py index 4c696e47..30c68169 100644 --- a/scripts/load-test.py +++ b/scripts/load-test.py @@ -20,4 +20,4 @@ harvest_source.get_record_changes() harvest_source.synchronize_records() -harvest_source.report() +harvest_source.do_report() diff --git a/tests/conftest.py b/tests/conftest.py index 2db92860..0b0c9862 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,7 +32,9 @@ def default_session_fixture(): with patch("harvester.lib.cf_handler.CloudFoundryClient"), patch( "harvester.lib.cf_handler.TaskManager" - ), patch("app.load_manager.start", lambda: True): + ), patch("harvester.harvest.smtplib"), patch( + "app.load_manager.start", lambda: True + ): yield @@ -379,7 +381,6 @@ def interface_with_multiple_sources( return interface_with_fixture_json -## MISC @pytest.fixture def interface_with_multiple_jobs(interface_no_jobs, source_data_dcatus): statuses = ["new", "in_progress", "complete", "error"] diff --git a/tests/fixtures.json b/tests/fixtures.json index 8a99be54..dadd8223 100644 --- a/tests/fixtures.json +++ b/tests/fixtures.json @@ -1,179 +1,179 @@ { - "organization": [ - { - "name": "Test Org", - "logo": "https://example.com/logo.png", - "id": "d925f84d-955b-4cb7-812f-dcfd6681a18f" - } - ], - "source": [ - { - "id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "name": "Test Source", - "notification_emails": "email@example.com", - "organization_id": "d925f84d-955b-4cb7-812f-dcfd6681a18f", - "frequency": "daily", - "url": "http://localhost:80/dcatus/dcatus.json", - "schema_type": "dcatus1.1", - "source_type": "document" - } - ], - "job": [ - { - "id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "status": "new", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b" - } - ], - "job_error": [ - { - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "message": "error reading records from harvest database", - "type": "ExtractInternalException" - } - ], - "record": [ - { - "id": "0779c855-df20-49c8-9108-66359d82b77c", - "identifier": "test_identifier-1", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "c218c965-3670-45c8-bfcd-f852d71ed917", - "identifier": "test_identifier-2", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "e1f603cc-8b6b-483f-beb4-86bda5462b79", - "identifier": "test_identifier-3", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "1c004473-0802-4f22-a16d-7a2d7559719e", - "identifier": "test_identifier-4", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "deb12fa0-d812-4d6e-98f4-d4f7d776c6b3", - "identifier": "test_identifier-5", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "27b5d5d6-808b-4a8c-ae4a-99f118e282dd", - "identifier": "test_identifier-6", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "c232a2ca-6344-4692-adc2-29f618a2eff3", - "identifier": "test_identifier-7", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "95021355-bad0-442b-98e9-475ecd849033", - "identifier": "test_identifier-8", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "09f073b3-00e3-4147-ba69-a5d0fd7ce027", - "identifier": "test_identifier-9", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - }, - { - "id": "97492788-5d62-4feb-8641-6f6692aec026", - "identifier": "test_identifier-10", - "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", - "action": "create", - "status": "error", - "source_raw": "example data" - } - ], - "record_error": [ - { - "harvest_record_id": "0779c855-df20-49c8-9108-66359d82b77c", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "c218c965-3670-45c8-bfcd-f852d71ed917", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "e1f603cc-8b6b-483f-beb4-86bda5462b79", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "1c004473-0802-4f22-a16d-7a2d7559719e", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "deb12fa0-d812-4d6e-98f4-d4f7d776c6b3", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "27b5d5d6-808b-4a8c-ae4a-99f118e282dd", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "c232a2ca-6344-4692-adc2-29f618a2eff3", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "95021355-bad0-442b-98e9-475ecd849033", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "09f073b3-00e3-4147-ba69-a5d0fd7ce027", - "message": "record is invalid", - "type": "ValidationException" - }, - { - "harvest_record_id": "97492788-5d62-4feb-8641-6f6692aec026", - "message": "record is invalid", - "type": "ValidationException" - } - ] + "organization": [ + { + "name": "Test Org", + "logo": "https://example.com/logo.png", + "id": "d925f84d-955b-4cb7-812f-dcfd6681a18f" + } + ], + "source": [ + { + "id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "name": "Test Source", + "notification_emails": "email@example.com", + "organization_id": "d925f84d-955b-4cb7-812f-dcfd6681a18f", + "frequency": "daily", + "url": "http://localhost:80/dcatus/dcatus.json", + "schema_type": "dcatus1.1", + "source_type": "document" + } + ], + "job": [ + { + "id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "status": "new", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b" + } + ], + "job_error": [ + { + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "message": "error reading records from harvest database", + "type": "ExtractInternalException" + } + ], + "record": [ + { + "id": "0779c855-df20-49c8-9108-66359d82b77c", + "identifier": "test_identifier-1", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-0\", \"identifier\": \"test-0\"}" + }, + { + "id": "c218c965-3670-45c8-bfcd-f852d71ed917", + "identifier": "test_identifier-2", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-1\", \"identifier\": \"test-1\"}" + }, + { + "id": "e1f603cc-8b6b-483f-beb4-86bda5462b79", + "identifier": "test_identifier-3", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-2\", \"identifier\": \"test-2\"}" + }, + { + "id": "1c004473-0802-4f22-a16d-7a2d7559719e", + "identifier": "test_identifier-4", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-3\", \"identifier\": \"test-3\"}" + }, + { + "id": "deb12fa0-d812-4d6e-98f4-d4f7d776c6b3", + "identifier": "test_identifier-5", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-4\", \"identifier\": \"test-4\"}" + }, + { + "id": "27b5d5d6-808b-4a8c-ae4a-99f118e282dd", + "identifier": "test_identifier-6", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-5\", \"identifier\": \"test-5\"}" + }, + { + "id": "c232a2ca-6344-4692-adc2-29f618a2eff3", + "identifier": "test_identifier-7", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-6\", \"identifier\": \"test-6\"}" + }, + { + "id": "95021355-bad0-442b-98e9-475ecd849033", + "identifier": "test_identifier-8", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-7\", \"identifier\": \"test-7\"}" + }, + { + "id": "09f073b3-00e3-4147-ba69-a5d0fd7ce027", + "identifier": "test_identifier-9", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-8\", \"identifier\": \"test-8\"}" + }, + { + "id": "97492788-5d62-4feb-8641-6f6692aec026", + "identifier": "test_identifier-10", + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", + "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", + "action": "create", + "status": "error", + "source_raw": "{\"title\": \"test-9\", \"identifier\": \"test-9\"}" + } + ], + "record_error": [ + { + "harvest_record_id": "0779c855-df20-49c8-9108-66359d82b77c", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "c218c965-3670-45c8-bfcd-f852d71ed917", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "e1f603cc-8b6b-483f-beb4-86bda5462b79", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "1c004473-0802-4f22-a16d-7a2d7559719e", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "deb12fa0-d812-4d6e-98f4-d4f7d776c6b3", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "27b5d5d6-808b-4a8c-ae4a-99f118e282dd", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "c232a2ca-6344-4692-adc2-29f618a2eff3", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "95021355-bad0-442b-98e9-475ecd849033", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "09f073b3-00e3-4147-ba69-a5d0fd7ce027", + "message": "record is invalid", + "type": "ValidationException" + }, + { + "harvest_record_id": "97492788-5d62-4feb-8641-6f6692aec026", + "message": "record is invalid", + "type": "ValidationException" + } + ] } diff --git a/tests/integration/app/test_load_manager.py b/tests/integration/app/test_load_manager.py index 3ae96225..b5051924 100644 --- a/tests/integration/app/test_load_manager.py +++ b/tests/integration/app/test_load_manager.py @@ -189,8 +189,9 @@ def test_manual_job_doesnt_affect_scheduled_jobs( assert source_data_dcatus["frequency"] == "daily" assert jobs[0].date_created == datetime.now() + timedelta(days=1) - jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source_data_dcatus["id"]} + source_id = source_data_dcatus["id"] + jobs = interface_no_jobs.pget_harvest_jobs( + facets=f"harvest_source_id = '{source_id}'" ) assert len(jobs) == 2 assert jobs[0].date_created == datetime.now() + timedelta(days=1) @@ -212,8 +213,9 @@ def test_dont_create_new_job_if_job_already_in_progress( load_manager = LoadManager() load_manager.schedule_first_job(source_data_dcatus["id"]) message = load_manager.trigger_manual_job(source_data_dcatus["id"]) - new_job = interface_no_jobs.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source_data_dcatus["id"], "status": "in_progress"} + source_id = source_data_dcatus["id"] + new_job = interface_no_jobs.pget_harvest_jobs( + facets=f"harvest_source_id = '{source_id}', status = 'in_progress'" ) assert message == f"Updated job {new_job[0].id} to in_progress" message = load_manager.trigger_manual_job(source_data_dcatus["id"]) @@ -222,8 +224,8 @@ def test_dont_create_new_job_if_job_already_in_progress( == f"Can't trigger harvest. Job {new_job[0].id} already in progress." ) - jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source_data_dcatus["id"]} + jobs = interface_no_jobs.pget_harvest_jobs( + facets=f"harvest_source_id = '{source_id}'" ) assert len(jobs) == 2 @@ -251,8 +253,9 @@ def test_assert_env_var_changes_task_size( assert start_task_mock.call_args[0][4] == "1536" # clear out in progress jobs - jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source_data_dcatus["id"]} + source_id = source_data_dcatus["id"] + jobs = interface_no_jobs.pget_harvest_jobs( + facets=f"harvest_source_id = '{source_id}'" ) interface_no_jobs.delete_harvest_job(jobs[0].id) @@ -283,8 +286,9 @@ def test_trigger_cancel_job( load_manager = LoadManager() load_manager.trigger_manual_job(source_data_dcatus["id"]) - jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source_data_dcatus["id"]} + source_id = source_data_dcatus["id"] + jobs = interface_no_jobs.pget_harvest_jobs( + facets=f"harvest_source_id = '{source_id}'" ) task_guid_val = "3a24b55a02b0-eb7b-4eeb-9f45-645cedd3d93b" diff --git a/tests/integration/app/test_login_required.py b/tests/integration/app/test_login_required.py index 09af18e7..7c9da4ab 100644 --- a/tests/integration/app/test_login_required.py +++ b/tests/integration/app/test_login_required.py @@ -24,89 +24,94 @@ def wrapper(*args, **kwargs): return inner -# Logged in user can see protected page -@force_login(email="test@data.gov") -def test_harvest_edit__logged_in(client, interface_no_jobs, source_data_dcatus): - res = client.get(f"/harvest_source/config/edit/{source_data_dcatus['id']}") - assert res.status_code == 200 - - -# Logged out user cannot see protected page -def test_harvest_edit__logged_out(client, interface_no_jobs, source_data_dcatus): - res = client.get(f"/harvest_source/config/edit/{source_data_dcatus['id']}") - redirect_str = 'You should be redirected automatically to the target URL: /login' - assert res.status_code == 302 - assert res.text.find(redirect_str) != -1 - - -# Logged in user is redirected away from bad url -@force_login(email="test@data.gov") -def test_harvest_edit_bad_source_url(client, interface_no_jobs): - res = client.get("/harvest_source/config/edit/1234") - redirect_str = 'You should be redirected automatically to the target URL: /harvest_sources/' - assert res.status_code == 302 - assert res.text.find(redirect_str) != -1 - - -# Logged in user can see the organization action buttons -@force_login(email="test@data.gov") -def test_org_edit_buttons__logged_in( - client, interface_with_multiple_jobs, source_data_dcatus, organization_data -): - res = client.get(f"/organization/{organization_data['id']}") - button_string_text = '
' - org_edit_text = f'