diff --git a/tifeatures/dbmodel.py b/tifeatures/dbmodel.py index f9c54a2..200d10e 100644 --- a/tifeatures/dbmodel.py +++ b/tifeatures/dbmodel.py @@ -90,16 +90,22 @@ def id_column_info(self) -> Column: # type: ignore if col.name == self.id_column: return col - def columns(self, properties: Optional[List[str]] = None) -> List[str]: + def columns( + self, properties: Optional[List[str]] = None, nogeo: bool = False + ) -> List[str]: """Return table columns optionally filtered to only include columns from properties.""" - cols = [c.name for c in self.properties] + if nogeo: + cols = [c.name for c in self.properties if not c.type.startswith("geo")] + else: + cols = [c.name for c in self.properties] if properties is not None: if self.id_column not in properties: properties.append(self.id_column) - geom_col = self.geometry_column() - if geom_col: - properties.append(geom_col.name) + if not nogeo: + geom_col = self.geometry_column() + if geom_col: + properties.append(geom_col.name) cols = [col for col in cols if col in properties] diff --git a/tifeatures/factory.py b/tifeatures/factory.py index f58445a..06521c5 100644 --- a/tifeatures/factory.py +++ b/tifeatures/factory.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, field from typing import Any, Callable, Dict, Generator, Iterable, List, Optional +import buildpg import jinja2 from pygeofilter.ast import AstType @@ -564,7 +565,7 @@ async def items( description="Limits the number of features in the response.", ), offset: Optional[int] = Query( - None, + 0, ge=0, description="Starts the response at an offset.", ), @@ -578,9 +579,12 @@ async def items( description="Simplify the output geometry to given threshold in decimal degrees.", ), output_type: Optional[MediaType] = Depends(ItemsOutputType), + count_exact: Optional[bool] = Query( + False, + description="Return an exact count of features rather than an estimate.", + ), ): - offset = offset or 0 - + pool = request.app.state.pool # =VALUE - filter features for a property having a value. Multiple property filters are ANDed together. exclude = [ "f", @@ -603,195 +607,234 @@ async def items( for (key, value) in request.query_params.items() if key.lower() not in exclude ] - - items, matched_items = await collection.features( - request.app.state.pool, - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - sortby=sortby, - properties=properties, - limit=limit, - offset=offset, + _select = collection._select(properties) + _select += buildpg.logic.Empty().comma( + buildpg.V(collection.id_column).as_("itemid") + ) + _geom = collection._geom(geom_column, bbox_only, simplify) + if _geom: + _select += buildpg.logic.Empty().comma(_geom) + + _from = collection._from() + _where = collection._where( + ids=ids_filter, + datetime=datetime_filter, + bbox=bbox_filter, + properties=properties_filter, + cql=cql_filter, geom=geom_column, dt=datetime_column, - bbox_only=bbox_only, - simplify=simplify, ) - - if output_type in ( - MediaType.csv, - MediaType.json, - MediaType.ndjson, - ): - if items and items[0].geometry is not None: - rows = ( - { - "collectionId": collection.id, - "itemId": f.id, - **f.properties, - "geometry": f.geometry.wkt, - } - for f in items - ) - - else: - rows = ( - { - "collectionId": collection.id, - "itemId": f.id, - **f.properties, - } - for f in items - ) - - # CSV Response - if output_type == MediaType.csv: - return StreamingResponse( - create_csv_rows(rows), - media_type=MediaType.csv, - headers={ - "Content-Disposition": "attachment;filename=items.csv" - }, - ) - - # JSON Response - if output_type == MediaType.json: - return JSONResponse([row for row in rows]) - - # NDJSON Response - if output_type == MediaType.ndjson: - return StreamingResponse( - (json.dumps(row) + "\n" for row in rows), - media_type=MediaType.ndjson, - headers={ - "Content-Disposition": "attachment;filename=items.ndjson" - }, - ) - - qs = "?" + str(request.query_params) if request.query_params else "" - links = [ - model.Link( - title="Collection", - href=self.url_for( - request, "collection", collectionId=collection.id - ), - rel="collection", - type=MediaType.json, - ), - model.Link( - title="Items", - href=self.url_for(request, "items", collectionId=collection.id) - + qs, - rel="self", - type=MediaType.geojson, - ), - ] - - items_returned = len(items) - - if (matched_items - items_returned) > offset: - next_offset = offset + items_returned - query_params = QueryParams( - {**request.query_params, "offset": next_offset} - ) - url = ( - self.url_for(request, "items", collectionId=collection.id) - + f"?{query_params}" - ) - links.append( - model.Link( - href=url, - rel="next", - type=MediaType.geojson, - title="Next page", - ), - ) - - if offset: - query_params = dict(request.query_params) - query_params.pop("offset") - prev_offset = max(offset - items_returned, 0) - if prev_offset: - query_params = QueryParams({**query_params, "offset": prev_offset}) - else: - query_params = QueryParams({**query_params}) - - url = self.url_for(request, "items", collectionId=collection.id) - if query_params: - url += f"?{query_params}" - - links.append( - model.Link( - href=url, - rel="prev", - type=MediaType.geojson, - title="Previous page", - ), + _sortby = collection._sortby(sortby, limit, offset) + _features = _select + _from + _where + _sortby + total_count = await collection.query_count(pool, _from, _where, count_exact) + if total_count == 0: + raise NotFound + print(total_count) + print(output_type) + + collection_href = str( + self.url_for( + request, + "collection", + collectionId=collection.id, ) - - data = model.Items( - id=collection.id, - title=collection.title or collection.id, - description=collection.description or collection.title or collection.id, - numberMatched=matched_items, - numberReturned=items_returned, - links=links, - features=[ - model.Item( - **{ - **feature.dict(), - "links": [ - model.Link( - title="Collection", - href=self.url_for( - request, - "collection", - collectionId=collection.id, - ), - rel="collection", - type=MediaType.json, - ), - model.Link( - title="Item", - href=self.url_for( - request, - "item", - collectionId=collection.id, - itemId=feature.properties[collection.id_column], - ), - rel="item", - type=MediaType.json, - ), - ], - } - ) - for feature in items - ], ) - # HTML Response - if output_type == MediaType.html: - return self._create_html_response( - request, - data.json(exclude_none=True), - template_name="items", - ) + qs = "?" + str(request.query_params) if request.query_params else "" - # GeoJSONSeq Response - elif output_type == MediaType.geojsonseq: + if output_type == MediaType.geojsonseq: return StreamingResponse( - data.json_seq(exclude_none=True), + collection.query_geojson_rows(pool, _features, collection_href), media_type=MediaType.geojsonseq, headers={ - "Content-Disposition": "attachment;filename=items.geojson" + "Content-Disposition": "attachment;filename=items.geojsonseq" }, ) - # Default to GeoJSON Response - return data + if output_type == MediaType.geojson: + return StreamingResponse( + collection.query_geojson( + pool, + _features, + collection_href, + total_count, + query_params=request.query_params, + ), + media_type=MediaType.geojson, + ) + + # if output_type in ( + # MediaType.csv, + # MediaType.json, + # MediaType.ndjson, + # ): + # if items and items[0].geometry is not None: + # rows = ( + # { + # "collectionId": collection.id, + # "itemId": f.id, + # **f.properties, + # "geometry": f.geometry.wkt, + # } + # for f in items + # ) + + # else: + # rows = ( + # { + # "collectionId": collection.id, + # "itemId": f.id, + # **f.properties, + # } + # for f in items + # ) + + # # CSV Response + # if output_type == MediaType.csv: + # return StreamingResponse( + # create_csv_rows(rows), + # media_type=MediaType.csv, + # headers={ + # "Content-Disposition": "attachment;filename=items.csv" + # }, + # ) + + # # JSON Response + # if output_type == MediaType.json: + # return JSONResponse([row for row in rows]) + + # # NDJSON Response + # if output_type == MediaType.ndjson: + # return StreamingResponse( + # (json.dumps(row) + "\n" for row in rows), + # media_type=MediaType.ndjson, + # headers={ + # "Content-Disposition": "attachment;filename=items.ndjson" + # }, + # ) + + # qs = "?" + str(request.query_params) if request.query_params else "" + # links = [ + # model.Link( + # title="Collection", + # href=self.url_for( + # request, "collection", collectionId=collection.id + # ), + # rel="collection", + # type=MediaType.json, + # ), + # model.Link( + # title="Items", + # href=self.url_for(request, "items", collectionId=collection.id) + # + qs, + # rel="self", + # type=MediaType.geojson, + # ), + # ] + + # items_returned = len(items) + + # if (matched_items - items_returned) > offset: + # next_offset = offset + items_returned + # query_params = QueryParams( + # {**request.query_params, "offset": next_offset} + # ) + # url = ( + # self.url_for(request, "items", collectionId=collection.id) + # + f"?{query_params}" + # ) + # links.append( + # model.Link( + # href=url, + # rel="next", + # type=MediaType.geojson, + # title="Next page", + # ), + # ) + + # if offset: + # query_params = dict(request.query_params) + # query_params.pop("offset") + # prev_offset = max(offset - items_returned, 0) + # if prev_offset: + # query_params = QueryParams({**query_params, "offset": prev_offset}) + # else: + # query_params = QueryParams({**query_params}) + + # url = self.url_for(request, "items", collectionId=collection.id) + # if query_params: + # url += f"?{query_params}" + + # links.append( + # model.Link( + # href=url, + # rel="prev", + # type=MediaType.geojson, + # title="Previous page", + # ), + # ) + + # data = model.Items( + # id=collection.id, + # title=collection.title or collection.id, + # description=collection.description or collection.title or collection.id, + # numberMatched=matched_items, + # numberReturned=items_returned, + # links=links, + # features=[ + # model.Item( + # **{ + # **feature.dict(), + # "links": [ + # model.Link( + # title="Collection", + # href=self.url_for( + # request, + # "collection", + # collectionId=collection.id, + # ), + # rel="collection", + # type=MediaType.json, + # ), + # model.Link( + # title="Item", + # href=self.url_for( + # request, + # "item", + # collectionId=collection.id, + # itemId=feature.properties[collection.id_column], + # ), + # rel="item", + # type=MediaType.json, + # ), + # ], + # } + # ) + # for feature in items + # ], + # ) + + # # HTML Response + # if output_type == MediaType.html: + # return self._create_html_response( + # request, + # data.json(exclude_none=True), + # template_name="items", + # ) + + # # GeoJSONSeq Response + # elif output_type == MediaType.geojsonseq: + # return StreamingResponse( + # data.json_seq(exclude_none=True), + # media_type=MediaType.geojsonseq, + # headers={ + # "Content-Disposition": "attachment;filename=items.geojson" + # }, + # ) + + # # Default to GeoJSON Response + # return data @self.router.get( "/collections/{collectionId}/items/{itemId}", diff --git a/tifeatures/layer.py b/tifeatures/layer.py index 0053f61..62259cc 100644 --- a/tifeatures/layer.py +++ b/tifeatures/layer.py @@ -1,8 +1,10 @@ """tifeatures.layers.""" import abc +import json import re from dataclasses import dataclass +from time import time from typing import Any, ClassVar, Dict, List, Optional, Tuple from buildpg import asyncpg, clauses @@ -25,6 +27,8 @@ from tifeatures.filter.evaluate import to_filter from tifeatures.filter.filters import bbox_to_wkt +from starlette.datastructures import QueryParams + # Links to geojson schema geojson_schema = { "GEOMETRY": "https://geojson.org/schema/Geometry.json", @@ -114,7 +118,7 @@ def bounds_default(cls, values): return values def _select(self, properties: Optional[List[str]]): - return clauses.Select(self.columns(properties)) + return clauses.Select(self.columns(properties, True)) def _select_count(self): return clauses.Select(pg_funcs.count("*")) @@ -124,12 +128,17 @@ def _from(self): def _geom( self, - geometry_column: Optional[GeometryColumn], + geom_column: Optional[str], bbox_only: Optional[bool], simplify: Optional[float], ): - if geometry_column is None: - return pg_funcs.cast(None, "json") + if geom_column and geom_column.lower() == "none": + return None + + geometry_column = self.geometry_column(geom_column) + + if not geometry_column: + raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom_column}.") g = logic.V(geometry_column.name) g = pg_funcs.cast(g, "geometry") @@ -146,7 +155,7 @@ def _geom( simplify, ) - g = logic.Func("ST_AsGeoJson", g) + g = g.as_("geom") return g @@ -157,8 +166,8 @@ def _where( bbox: Optional[List[float]] = None, properties: Optional[List[Tuple[str, Any]]] = None, cql: Optional[AstType] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, ): """Construct WHERE query.""" wheres = [logic.S(True)] @@ -262,8 +271,11 @@ def _datetime_filter_to_sql(self, interval: List[str], dt_name: str): logic.V(dt_name) < logic.S(pg_funcs.cast(end, "timestamptz")), ) - def _sortby(self, sortby: Optional[str]): + def _sortby(self, sortby: Optional[str], limit: int, offset: int): + pk = self.id_column sorts = [] + pkseen = False + if sortby: for s in sortby.strip().split(","): parts = re.match( @@ -272,22 +284,24 @@ def _sortby(self, sortby: Optional[str]): direction = parts["direction"] column = parts["column"].strip() + if column == pk: + pkseen = True if self.get_column(column): if direction == "-": sorts.append(logic.V(column).desc()) + else: sorts.append(logic.V(column)) else: raise InvalidPropertyName(f"Property {column} does not exist.") - else: - sorts.append(logic.V(self.id_column)) + if not pkseen: + sorts.append(logic.V(pk)) - return clauses.OrderBy(*sorts) + return clauses.OrderBy(*sorts) + clauses.Limit(limit) + clauses.Offset(offset) def _features_query( self, - *, ids_filter: Optional[List[str]] = None, bbox_filter: Optional[List[float]] = None, datetime_filter: Optional[List[str]] = None, @@ -295,8 +309,8 @@ def _features_query( cql_filter: Optional[AstType] = None, sortby: Optional[str] = None, properties: Optional[List[str]] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, ): @@ -326,8 +340,8 @@ def _features_count_query( datetime_filter: Optional[List[str]] = None, properties_filter: Optional[List[Tuple[str, str]]] = None, cql_filter: Optional[AstType] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, ): """Build features COUNT query.""" return ( @@ -344,97 +358,127 @@ def _features_count_query( ) ) - async def query( - self, - pool: asyncpg.BuildPgPool, - *, - ids_filter: Optional[List[str]] = None, - bbox_filter: Optional[List[float]] = None, - datetime_filter: Optional[List[str]] = None, - properties_filter: Optional[List[Tuple[str, str]]] = None, - cql_filter: Optional[AstType] = None, - sortby: Optional[str] = None, - properties: Optional[List[str]] = None, - geom: str = None, - dt: str = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - bbox_only: Optional[bool] = None, - simplify: Optional[float] = None, - ) -> Tuple[FeatureCollection, int]: - """Build and run Pg query.""" - if geom and geom.lower() != "none" and not self.geometry_column(geom): - raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom}.") - - sql_query = """ - WITH - features AS ( - :features_q - ), - total_count AS ( - :count_q - ) - SELECT json_build_object( - 'type', 'FeatureCollection', - 'features', - ( - SELECT - json_agg( - json_build_object( - 'type', 'Feature', - 'id', :id_column, - 'geometry', :geometry_q, - 'properties', to_jsonb( features.* ) - :geom_columns::text[] - ) - ) - FROM features - ), - 'total_count', - ( - SELECT count FROM total_count - ) - ) - ; - """ + async def query_count(self, pool, _from_clause, _where_clause, count_exact): + """Get the estimated count/cost from query.""" + from_where_clause = _from_clause + _where_clause + if count_exact: + async with pool.acquire() as conn: + q, p = (self._select_count() + from_where_clause).render() + return conn.fetchval(q, *p) q, p = render( - sql_query, - features_q=self._features_query( - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - sortby=sortby, - properties=properties, - geom=geom, - dt=dt, - limit=limit, - offset=offset, - ), - count_q=self._features_count_query( - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - geom=geom, - dt=dt, - ), - id_column=logic.V(self.id_column), - geometry_q=self._geom( - geometry_column=self.geometry_column(geom), - bbox_only=bbox_only, - simplify=simplify, - ), - geom_columns=[g.name for g in self.geometry_columns], + """ + EXPLAIN (FORMAT JSON) + SELECT 1 + :from_where_clause + """, + from_where_clause=from_where_clause, ) async with pool.acquire() as conn: - items = await conn.fetchval(q, *p) + explain = await conn.fetchval(q, *p) + return explain[0]["Plan"]["Plan Rows"] - return ( - FeatureCollection(features=items.get("features") or []), - items["total_count"], + async def query_geojson_rows(self, pool, features_query, collection_href): + """Build and run Pg query to get json rows.""" + st = time() + q, p = render( + """ + WITH features AS ( + :features_query + ) + SELECT + jsonb_build_object( + 'type', 'Feature', + 'id', itemid, + 'geometry', ST_ASGeoJson(geom)::json, + 'properties', to_jsonb( features.* ) - '{itemid,geom}'::text[], + 'links', jsonb_build_array( + jsonb_build_object( + 'title', 'Collection', + 'href', :collection_href::text, + 'rel', 'collection', + 'type', 'application/json' + ), + jsonb_build_object( + 'title', 'Item', + 'href', format('%s/items/%s',:collection_href::text,itemid), + 'rel', 'item', + 'type', 'application/json' + ) + ) + )::text + FROM features + ; + """, + features_query=features_query, + collection_href=collection_href, ) + print(q, p) + async with pool.acquire() as conn: + async with conn.transaction(): + async for record in conn.cursor(q, *p, prefetch=50, timeout=120): + yield record[0] + "\n" + + async def query_geojson( + self, pool, features_query, collection_href, total, query_params=None + ): + """Build and run Pg query to get json rows.""" + cnt = 0 + yield '{"type":"FeatureCollection","features":[' + async for rec in self.query_geojson_rows(pool, features_query, collection_href): + cnt += 1 + if cnt > 1: + yield "," + rec + else: + yield rec + links = [ + { + "title": "Collection", + "href": collection_href, + "rel": "collection", + "type": "application/json", + }, + { + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "self", + "type": "application/json", + }, + ] + + if query_params: + offset = int(query_params.get("offset", 0)) + else: + offset = 0 + + if (total - cnt) > offset: + next_offset = offset + cnt + query_params = QueryParams({**query_params, "offset": next_offset}) + + links.append( + { + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "next", + "type": "application/json", + } + ) + + if offset > 0: + prev_offset = max(offset - cnt, 0) + query_params = QueryParams({**query_params, "offset": prev_offset}) + + links.append( + { + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "prev", + "type": "application/json", + } + ) + + links_str = json.dumps(links) + + yield f'],"numberMatched":{total},"numberReturned":{cnt}, "links":{links_str}}}' async def features( self,