Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple flask app #115

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 38 additions & 34 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ def add_organization():
)


@mod.route("/organization/", methods=["GET"])
@mod.route("/organizations/", methods=["GET"])
def view_organizations():
organizations = db.get_all_organizations()
Expand Down Expand Up @@ -472,16 +471,16 @@ def view_harvest_source_data(source_id: str):
count=True,
skip_pagination=True,
source_id=source.id,
facets=["ckan_id != null"],
facets="ckan_id is not null",
)
error_records_count = db.get_harvest_records_by_source(
count=True,
skip_pagination=True,
source_id=source.id,
facets=["status = 'error'"],
facets="status = 'error'",
)
# TODO: wire in paginated jobs query
jobs = db.get_all_harvest_jobs_by_filter({"harvest_source_id": source.id})
# TODO: wire in paginated jobs htmx refresh ui & route
jobs = db.pget_harvest_jobs(facets=f"harvest_source_id = '{source.id}'")
next_job = "N/A"
future_jobs = db.get_new_harvest_jobs_by_source_in_future(source.id)
if len(future_jobs):
Expand Down Expand Up @@ -528,7 +527,6 @@ def view_harvest_source_data(source_id: str):
return render_template("view_source_data.html", data=data)


@mod.route("/harvest_source/", methods=["GET"])
@mod.route("/harvest_sources/", methods=["GET"])
def view_harvest_sources():
sources = db.get_all_harvest_sources()
Expand Down Expand Up @@ -745,18 +743,45 @@ def get_harvest_record(record_id):
def get_harvest_records():
job_id = request.args.get("harvest_job_id")
source_id = request.args.get("harvest_source_id")
page = request.args.get("page")
paginate = request.args.get("paginate", type=bool)
skip_pagination = request.args.get("skip_pagination", type=bool)
count = request.args.get("count", type=bool)
page = request.args.get("page", type=int)
facets = request.args.get("facets", "")
if job_id:
records = db.get_harvest_records_by_job(job_id, page)
if not records:
return "No harvest records found for this harvest job", 404
records = db.get_harvest_records_by_job(
job_id,
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
facets=facets,
)

elif source_id:
records = db.get_harvest_records_by_source(source_id, page)
records = db.get_harvest_records_by_source(
source_id,
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
count=count,
facets=facets,
)
if not records:
return "No harvest records found for this harvest source", 404
else:
records = db.pget_harvest_records(page)
return db._to_dict(records)
records = db.pget_harvest_records(
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
facets=facets,
)

if not records:
return "No harvest records found for this query", 404
elif isinstance(records, int):
return f"{records} records found", 200
else:
return db._to_dict(records)


@mod.route("/harvest_record/<record_id>/raw", methods=["GET"])
Expand Down Expand Up @@ -818,27 +843,6 @@ def get_data_sources():
return render_template("get_data_sources.html", sources=source, organizations=org)


## Test interface, will remove later
@mod.route("/delete_all_records", methods=["DELETE"])
def delete_all_records():
db.delete_all_harvest_records()
return "All harvest records deleted"


## Test interface, will remove later
@mod.route("/add_harvest_job_error", methods=["POST"])
def add_harvest_job_error():
db.add_harvest_job_error(request.json)
return "Added harvest job error"


## Test interface, will remove later
@mod.route("/add_harvest_record_error", methods=["POST"])
def add_harvest_record_error():
err = db.add_harvest_record_error(request.json)
return db._to_dict(err)


def register_routes(app):
app.register_blueprint(mod)
app.register_blueprint(user)
Expand Down
68 changes: 41 additions & 27 deletions database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

import ckanapi
from ckanapi import RemoteCKAN
from sqlalchemy import create_engine, func, inspect, or_, select, text
from sqlalchemy import create_engine, func, inspect, select, text
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import scoped_session, sessionmaker

from harvester.utils.general_utils import query_filter_builder

from .models import (
HarvestJob,
HarvestJobError,
Expand Down Expand Up @@ -224,8 +226,9 @@ def _clear_harvest_records():

ckan_ids = [record.ckan_id for record in records if record.ckan_id is not None]
error_records = [record for record in records if record.status == "error"]
jobs_in_progress = self.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
jobs_in_progress = self.pget_harvest_jobs(
facets=f"harvest_source_id = '{source.id}', 'status' = 'in_progress'",
paginate=False,
)

# Ensure no jobs are in progress
Expand Down Expand Up @@ -325,10 +328,6 @@ def add_harvest_job(self, job_data):
def get_harvest_job(self, job_id):
return self.db.query(HarvestJob).filter_by(id=job_id).first()

def get_all_harvest_jobs_by_filter(self, filter):
harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all()
return [job for job in harvest_jobs or []]

def get_first_harvest_jobs_by_filter(self, filter):
harvest_job = (
self.db.query(HarvestJob)
Expand Down Expand Up @@ -361,11 +360,6 @@ def get_new_harvest_jobs_by_source_in_future(self, source_id):
)
return [job for job in harvest_jobs or []]

def get_harvest_jobs_by_faceted_filter(self, attr, values):
query_list = [getattr(HarvestJob, attr) == value for value in values]
harvest_jobs = self.db.query(HarvestJob).filter(or_(*query_list)).all()
return [job for job in harvest_jobs]

def update_harvest_job(self, job_id, updates):
try:
job = self.db.get(HarvestJob, job_id)
Expand Down Expand Up @@ -463,6 +457,7 @@ def add_harvest_record(self, record_data):
self.db.rollback()
return None

# TODO: should we delete this if it's not used in code.
def add_harvest_records(self, records_data: list) -> dict:
"""
Add many records at once
Expand Down Expand Up @@ -524,6 +519,22 @@ def get_latest_harvest_records_by_source(self, source_id):

return [dict(zip(fields, record)) for record in records]

def get_all_latest_harvest_records_by_source(self, source_id):
# datetimes are returned as datetime objs not strs
sql = text(
f"""SELECT DISTINCT ON (identifier) *
FROM harvest_record
WHERE harvest_source_id = '{source_id}'
ORDER BY identifier, date_created DESC"""
)

res = self.db.execute(sql)

fields = list(res.keys())
records = res.fetchall()

return [dict(zip(fields, record)) for record in records]

def close(self):
if hasattr(self.db, "remove"):
self.db.remove()
Expand Down Expand Up @@ -602,29 +613,32 @@ def verify_user(self, usr_data):
#### PAGINATED QUERIES ####
@count
@paginate
def pget_harvest_jobs(self, filter=text(""), **kwargs):
return self.db.query(HarvestJob).filter(filter)
def pget_harvest_jobs(self, facets="", **kwargs):
facet_string = query_filter_builder(None, facets)
return self.db.query(HarvestJob).filter(text(facet_string))

@count
@paginate
def pget_harvest_records(self, filter=text(""), **kwargs):
return self.db.query(HarvestRecord).filter(filter)
def pget_harvest_records(self, facets="", **kwargs):
return self.db.query(HarvestRecord).filter(text(facets))

@count
@paginate
def pget_harvest_job_errors(self, filter=text(""), **kwargs):
return self.db.query(HarvestJobError).filter(filter)
def pget_harvest_job_errors(self, facets="", **kwargs):
return self.db.query(HarvestJobError).filter(text(facets))

@count
@paginate
def pget_harvest_record_errors(self, filter=text(""), **kwargs):
return self.db.query(HarvestRecordError).filter(filter)
def pget_harvest_record_errors(self, facets="", **kwargs):
return self.db.query(HarvestRecordError).filter(text(facets))

#### FACETED BUILDER QUERIES ####
def get_harvest_records_by_job(self, job_id, facets=[], **kwargs):
filter_string = " AND ".join([f"harvest_job_id = '{job_id}'"] + facets)
return self.pget_harvest_records(filter=text(filter_string), **kwargs)
#### FILTERED BUILDER QUERIES ####
def get_harvest_records_by_job(self, job_id, facets="", **kwargs):
facet_string = query_filter_builder(f"harvest_job_id = '{job_id}'", facets)
return self.pget_harvest_records(facets=facet_string, **kwargs)

def get_harvest_records_by_source(self, source_id, facets=[], **kwargs):
filter_string = " AND ".join([f"harvest_source_id = '{source_id}'"] + facets)
return self.pget_harvest_records(filter=text(filter_string), **kwargs)
def get_harvest_records_by_source(self, source_id, facets="", **kwargs):
facet_string = query_filter_builder(
f"harvest_source_id = '{source_id}'", facets
)
return self.pget_harvest_records(facets=facet_string, **kwargs)
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ services:
retries: 5
transformer:
image: ghcr.io/gsa/mdtranslator:latest
restart: always
env_file:
- .env
environment:
Expand Down
Loading
Loading