Consensus.EsriConnector module

Extending EsriConnector() class

This module provides a class for interacting with the Esri REST API. It creates a dictionary of Service objects given the URL of the server and add methods to extract the metadata for any of them. You can apply the EsriConnector() class to a new server by calling my_new_connection=EsriConnector(base_url=your_new_server) or by creating a separate class if you so wish:

from Consensus.EsriConnector import EsriConnector

class NewClass(EsriConnector):
    def __init__(self, max_retries: int = 10, retry_delay: int = 2) -> None:
        super().__init__(max_retries, retry_delay)
        self.base_url = your_new_server
        print(f"Connecting to {your_new_server}")

    def field_matching_condition(self, field: Dict[str, str]) -> bool:
        # accept only fields that end with 'CD' or 'NM'
        if field['name'].upper().endswith(('CD', 'NM')):
            return True

This is the basic building block that the Consensus package uses to interact with Esri REST APIs such as Open Geography Portal and TfL Open Data Hub. It is designed to be extended to provide additional functionality, such as custom methods for specific use cases.

FeatureServer() class example

FeatureServer() class on the other hand is used to download data from the Esri REST API. For example, to download the ward 2023 boundary data for Brockley in Lewisham from Open Geography Portal:

from Consensus.EsriConnector import FeatureServer
from Consensus.EsriServers import OpenGeography
from Consensus.utils import where_clause_maker
import asyncio

async def download_test_data():
    og = OpenGeography(max_retries=30, retry_delay=2)
    await og.initialise()

    fs_service_table = og.service_table
    fs = FeatureServer()

    column_name = 'WD23NM'
    geographic_areas = ['Brockley']
    service_name = 'Wards_December_2023_Boundaries_UK_BSC'
    layers = og.select_layers_by_service(service_name=service_name)  # choose the first layer of the 'Wards_December_2023_Boundaries_UK_BSC' service
    layer_full_name = layers[0].full_name  # use the layer's ``full_name`` attribute to select it in ``fs.setup()`` and when creating the ``where_clause``

    where_clause = where_clause_maker(string_list=geographic_areas, column_name=column_name, service_name=layer_full_name)  # a helper function that creates the SQL where clause for Esri Servers

    await fs.setup(full_name=layer_full_name, service_table=fs_service_table, max_retries=30, retry_delay=2, chunk_size=50)
    output = await fs.download(where_clause=where_clause, return_geometry=True)
    print(output)

asyncio.run(download_test_data())
class Consensus.EsriConnector.EsriConnector(max_retries=10, retry_delay=2, server_type='feature', base_url='', proxy=None, matchable_fields_extension=[])

Bases: object

Main class for connecting to Esri servers. This class uses Consensus.ConfigManager.ConfigManager() to load the config.json file for proxies. Specifically, the class uses https proxy.

base_url

The base URL of the Esri server. Built-in modules that use EsriConnector() class set their own base_url.

Type:

str

max_retries

The maximum number of retries for HTTP requests.

Type:

int

retry_delay

The delay in seconds between retries.

Type:

int

server_types

A dictionary of server types and their corresponding suffixes.

Type:

Dict[str, str]

services

A list of Service objects.

Type:

List[Service]

service_table

A Pandas DataFrame containing the service metadata.

Type:

pd.DataFrame

__init__(max_retries

int = 10, retry_delay: int = 2, server_type: str = ‘feature’, base_url: str = “”, proxy: str = None, matchable_fields_extension: List[str] = []): Initialise class.

field_matching_condition(field

Dict[str, str]): Condition for matchable fields. This method is used by Service() to filter the fields that are added to the matchable_fields columns, which is subsequently used by SmartLinker() for matching data tables.

initialise()

Initialise class. Must be called to initialise the async session and run the async tasks.

_validate_response()

Validate the response from the Esri server.

_fetch_response(session

aiohttp.ClientSession): Helper method to get response from Esri server.

get_layer_obj(service

Dict[str, str], session: aiohttp.ClientSession): Call the get_layers() method for a Service object to get the list of Layer objects.

_load_all_services()

Load all services from the Esri server into self.service_table

print_object_data(layer_obj

Layer): Print the object metadata.

print_all_services()

Print all services from the Esri server.

select_layers_by_service(service_name

str): Return the list of Layer objects for a given service.

select_layers_by_layers(layer_name

str): Find all Layer objects that share the same name.

metadata_as_pandas(included_services

List[str] = []): Return the metadata of the services as a Pandas DataFrame.

build_lookup(parent_path

Path = Path(__file__).resolve().parent, included_services: List[str] = [], replace_old: bool = True): Build a lookup table of the services. This method will call metadata_as_pandas() for each service and return a Pandas DataFrame as well as builds a json lookup file.

__init__(max_retries=10, retry_delay=2, server_type='feature', base_url='', proxy=None, matchable_fields_extension=[])

Initialise class.

Parameters:
  • max_retries (int) – The maximum number of retries for HTTP requests. Defaults to 10.

  • retry_delay (int) – The delay in seconds between retries. Defaults to 2.

  • base_url (str) – The base URL of the Esri server. Defaults to “”. Built-in modules that use EsriConnector() class set their own base_url.

  • proxy (str) – The proxy URL to use for requests. Defaults to None. Leave empty to make use of ConfigManager().

Returns:

None

async _fetch_response(session)

Helper method to fetch the response from the Esri server.

Parameters:

session (aiohttp.ClientSession) – The aiohttp.ClientSession object.

Returns:

The JSON response from the Esri server.

Return type:

Dict

async _load_all_services()

Load services into a dictionary.

Return type:

None

Returns:

None

async _validate_response()

Validate access to the base URL asynchronously using aiohttp. When a response is received, call _load_all_services() to load services into a dictionary.

Return type:

None

Returns:

None

async build_lookup(parent_path=PosixPath('/home/runner/work/Consensus/Consensus/Consensus'), included_services=[], replace_old=True)

Build a lookup table from scratch and save it to a JSON file.

Parameters:
  • parent_path (Path) – Parent path to save the lookup file.

  • included_services (List[str]) – List of services to include in the lookup. Defaults to [], which is interpreted as as ‘all’.

  • replace_old (bool) – Whether to replace the old lookup file. Defaults to True.

Returns:

The lookup table as a pandas DataFrame.

Return type:

pd.DataFrame

async field_matching_condition(field)

Condition for matchable fields. This method is used by Service() to filter the fields that are added to the matchable_fields columns, which is subsequently used by SmartLinker() for matching data tables. This method is meant to be overwritten by the user if they want to change the condition for matchable fields. Each Esri ArcGIS server will have its own rules, so this will be left for the user to deal with. If you are using a built-in server (e.g. TFL or Open Geography Portal), then you don’t have to touch this method.

Parameters:

field (Dict[str, str]) – The field dictionary. This is the input coming from Service(). This method should always accept a metadata dictionary describing the field.

Returns:

default is True if the field name is in matchable_fields_extension, otherwise False.

Return type:

bool

async get_layer_obj(service, session)

Fetch metadata for a service and add it to the service table.

Parameters:
  • service (Dict[str, str]) – Dictionary of services.

  • session (aiohttp.ClientSession) – The aiohttp.ClientSession object.

Return type:

None

Returns:

None

async initialise()

Run this method to initialise the class session.

Return type:

None

Returns:

None

async metadata_as_pandas(included_services=[])

Asynchronously create a Pandas DataFrame of selected tables’ metadata.

Parameters:

included_services (List[str]) – A list of service names to include in the DataFrame. If empty, all services are included.

Returns:

A DataFrame containing the metadata of the selected services.

Return type:

pd.DataFrame

print_all_services()

Print name, type, and URL of all services available through Esri server.

Return type:

None

Returns:

None

print_object_data(layer_obj)

Print the data of a Layer object.

Parameters:

layer_obj (Layer) – The Layer object to print.

Return type:

None

Returns:

None

Added in version 1.1.1.

select_layers_by_layers(layer_name)

Print a subset of the service table.

Parameters:

layer_name (str) – The name of the layer to print.

Returns:

A list of Layer objects for the selected service.

Return type:

List[Any]

Added in version 1.1.0

select_layers_by_service(service_name)

Print and output a subset of the service table.

Parameters:

service_name (str) – The name of the service to print.

Returns:

A list of Layer objects for the selected service.

Return type:

List[Any]

Added in version 1.1.0

class Consensus.EsriConnector.FeatureServer(proxy=None)

Bases: object

Download data from an Esri Feature Server asynchronously. This class uses Consensus.ConfigManager.ConfigManager() to load the config.json file for proxies. Specifically, the class uses https proxy.

feature_service

The Layer object.

Type:

Layer

max_retries

The maximum number of retries for a request.

Type:

int

retry_delay

The delay in seconds between retries.

Type:

int

chunk_size

The number of records to download in each chunk.

Type:

int

__init__(proxy

str): Initialise class.

setup(full_name

str, service_name: str, layer_name: str, service_table: Dict[str, Service], max_retries: int, retry_delay: int, chunk_size: int): Set up the FeatureServer Service object for downloading. You must give either the full_name or service_name and layer_name, as well as the service_table.

looper(session

aiohttp.ClientSession, link_url: str, params: Dict[str, Any]): Method to keep attempting to download data if connection lost.

chunker(session

aiohttp.ClientSession, params: Dict[str, Any]): Splits the download by chunk_size

download(fileformat

str, return_geometry: bool, where_clause: str, output_fields: str, params: Dict[str, str], n_sample_rows: int): Download data from the FeatureServer asynchronously.

Usage:
# In this example, we're using ``OpenGeography()`` sub-class

from Consensus.EsriConnector import FeatureServer
from Consensus.EsriServers import OpenGeography
from Consensus.utils import where_clause_maker
import asyncio

async def download_test_data():
    og = OpenGeography(max_retries=30, retry_delay=2)
    await og.initialise()

    fs_service_table = og.service_table
    fs = FeatureServer()

    column_name = 'WD23NM'
    geographic_areas = ['Brockley']
    service_name = 'Wards_December_2023_Boundaries_UK_BSC'
    layers = og.select_layers_by_service(service_name=service_name)  # choose the first layer of the 'Wards_December_2023_Boundaries_UK_BSC' service
    layer_full_name = layers[0].full_name  # use the layer's ``full_name`` attribute to select it in ``fs.setup()`` and when creating the ``where_clause``

    where_clause = where_clause_maker(string_list=geographic_areas, column_name=column_name, service_name=layer_full_name)  # a helper function that creates the SQL where clause for Esri Servers

    await fs.setup(full_name=layer_full_name, service_table=fs_service_table, max_retries=30, retry_delay=2, chunk_size=50)
    output = await fs.download(where_clause=where_clause, return_geometry=True)
    print(output)

asyncio.run(download_test_data())
__init__(proxy=None)

Initialise class.

Parameters:

proxy (str) – The proxy URL to use for requests. Defaults to None. Leave empty to make use of ConfigManager().

Returns:

None

async chunker(session, params)

Download data in chunks asynchronously.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session.

  • params (Dict[str, Any]) – The parameters for the query.

Returns:

The downloaded data as a dictionary.

Return type:

Dict[str, Any]

async download(fileformat='geojson', return_geometry=False, where_clause='1=1', output_fields='*', params=None, n_sample_rows=-1)

Download data from Esri server asynchronously.

Parameters:
  • fileformat (str) – The format of the downloaded data (‘geojson’, ‘json’, or ‘csv’). Perhaps best kept as geojson.

  • return_geometry (bool) – Whether to include geometry in the downloaded data.

  • where_clause (str) – The where clause to filter the data.

  • output_fields (str) – The fields to include in the downloaded data.

  • params (Dict[str, Any]) – Additional parameters for the query.

  • n_sample_rows (int) – The number of rows to sample for testing purposes.

Returns:

The downloaded data as a pandas DataFrame or geopandas GeoDataFrame.

Return type:

pd.DataFrame

async looper(session, link_url, params)

Keep trying to connect to Feature Service until max_retries or response.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session.

  • link_url (str) – The URL of the Feature Server service.

  • params (Dict[str, Any]) – The parameters for the query.

Returns:

The downloaded data as a dictionary.

Return type:

Dict[str, Any]

async setup(full_name=None, service_name=None, layer_name=None, service_table={}, max_retries=10, retry_delay=20, chunk_size=50)

Set up the FeatureServer Service object for downloading.

Parameters:
  • full_name (str) – The full name of the Feature Server service. Provide a value for either this argument or alternatively to service_name and layer_name, which the method builds the full_name.

  • service_name (str) – The name of the Feature Server service. Provide a value together with layer_name.

  • layer_name (str) – The name of the layer to download. Provide a value together with service_name.

  • service_table (Dict[str, Layer]) – Mandatory. A dictionary of Feature Server Layer objects.

  • max_retries (int) – The maximum number of retries for a request.

  • retry_delay (int) – The delay in seconds between retries.

  • chunk_size (int) – The number of records to download in each chunk.

Return type:

None

Returns:

None

class Consensus.EsriConnector.Layer(full_name, service_name, layer_name, id, fields, url, description, primary_key, matchable_fields, lasteditdate, data_from_layers, has_geometry, type)

Bases: object

Dataclass for layers.

full_name

The full name of the layer.

Type:

str

service_name

The name of the service the layer belongs to.

Type:

str

layer_name

The name of the layer.

Type:

str

id

The ID of the layer.

Type:

int

fields

The list of fields in the layer.

Type:

List[str]

url

The URL of the layer.

Type:

str

description

The description of the layer.

Type:

str

primary_key

The primary key of the layer.

Type:

str

matchable_fields

The list of matchable fields in the layer.

Type:

List[str]

lasteditdate

The last edit date of the layer.

Type:

str

data_from_layers

Whether the layer is from a data source.

Type:

bool

has_geometry

Whether the layer has geometry.

Type:

bool

type

The type of the layer.

Type:

str

_record_count(session

aiohttp.ClientSession, proxy: str): Helper method for asynchronous GET requests using aiohttp. This is used by the FeatureServer class.

_fetch(session

aiohttp.ClientSession, url: str, params: Dict[str, str] = None, proxy: str = None): Helper method for asynchronous GET requests using aiohttp.

__init__(full_name, service_name, layer_name, id, fields, url, description, primary_key, matchable_fields, lasteditdate, data_from_layers, has_geometry, type)
async _fetch(session, url, params=None, proxy=None)

Helper method for asynchronous GET requests using aiohttp.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • url (str) – The URL to fetch.

  • params (Dict[str, str]) – Query parameters. Defaults to None.

  • proxy (str) – Proxy string.

Returns:

The response as a JSON object.

Return type:

Dict[str, Any]

async _record_count(session, url, params, proxy)

Helper method for counting records.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • url (str) – The URL to fetch.

  • params (Dict[str, str]) – Query parameters.

  • proxy (str) – Proxy string that is passed to _fetch() method.

Returns:

The count of records for the chosen FeatureService

Return type:

int

data_from_layers: bool
description: str
fields: List[str]
full_name: str
has_geometry: bool
id: int
lasteditdate: str
layer_name: str
matchable_fields: List[str]
primary_key: str
service_name: str
type: str
url: str
class Consensus.EsriConnector.Service(name=None, type=None, url=None, description=None, layers=None, tables=None, output_formats=None, metadata=None, fields=None, primary_key=None, field_matching_condition=None)

Bases: object

Dataclass for services.

name

Name of service.

Type:

str

type

One of ‘FeatureServer’, ‘MapServer’, ‘WFSServer’.

Type:

str

url

URL.

Type:

str

description

Description of the service.

Type:

str

layers

Data available through service. If empty, it is likely that the ‘tables’ attribute contains the desired data.

Type:

List[Dict[str, Any]]

tables

Data available through service. If empty, it is likely that the ‘layers’ attribute contains the desired data.

Type:

List[Dict[str, Any]]

output_formats

List of formats available for the data.

Type:

List[str]

metadata

Metadata as JSON.

Type:

json

fields

List of fields for the data.

Type:

List[str]

primary_key

Primary key for the data.

Type:

str

field_matching_condition

Condition for matchable fields. This method is used by Service() to filter the fields that are added to the matchable_fields columns, which is subsequently used by SmartLinker() for matching data tables. You can define your own field_matching_condition() method for each Esri server by extending the relevant EsriConnector() sub-class.

Type:

Callable[[Dict[str, str]], bool]

featureservers()

Self-filtering method.

mapservers()

Self-filtering method.

wfsservers()

Self-filtering method.

_fetch(session

aiohttp.ClientSession, url: str, params: Dict[str, str] = None, proxy: str = None): Helper method for asynchronous GET requests using aiohttp.

service_details(session

aiohttp.ClientSession, proxy: str): Helper method for asynchronous GET requests using aiohttp. Gets more details about the service.

get_download_urls()

Helper method for getting download URLs.

service_metadata(self, session

aiohttp.ClientSession, proxy: str): Helper method for asynchronous GET requests using aiohttp. Gets the metadata for the service.

_matchable_fields()

Gets the matchable fields for the service based on the field_matching_condition() method.

_service_attributes(session

aiohttp.ClientSession, proxy): Helper method for asynchronous GET requests using aiohttp. Gets the attributes for the service.

get_layers(session

aiohttp.ClientSession, proxy: str): Main method that creates the lookup data format for the service.

__init__(name=None, type=None, url=None, description=None, layers=None, tables=None, output_formats=None, metadata=None, fields=None, primary_key=None, field_matching_condition=None)
async _fetch(session, url, params=None, proxy=None)

Helper method for asynchronous GET requests using aiohttp.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • url (str) – The URL to fetch.

  • params (Dict[str, str]) – Query parameters. Defaults to None.

  • proxy (str) – Proxy string.

Returns:

The response as a JSON object.

Return type:

Dict[str, Any]

async _matchable_fields(fields)

Returns a list of matchable fields for the service. It uses the field_matching_condition() method that can be defined for any Esri ArcGIS server.

Returns:

List of matchable fields.

Return type:

List[str]

async _service_attributes(session, proxy)

Fills attribute fields using the JSON information from service_details and service_metadata methods.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • proxy (str) – Proxy string that is passed to _fetch() method.

Return type:

None

Returns:

None

description: str = None
featureservers()

Self-filtering method.

Returns:

Self if type is ‘FeatureServer’ else None.

Return type:

Service

field_matching_condition: Callable[[Dict[str, str]], bool] = None
fields: List[str] = None
async get_download_urls()

Returns the download URL for the service.

Parameters:

session (aiohttp.ClientSession) – The aiohttp session object.

Returns:

List of download URLs to visit.

Return type:

List[str]

async get_layers(session, proxy)

Returns a Pandas-ready dictionary of the service’s metadata.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • proxy (str) – Proxy string that is passed to _fetch() method.

Returns:

A dictionary of the FeatureService’s metadata.

Return type:

Dict[str, List]

layers: List[Dict[str, Any]] = None
mapservers()

Self-filtering method. Currently unused.

Returns:

Self if type is ‘MapServer’ else None.

Return type:

Service

metadata: Dict = None
name: str = None
output_formats: List[str] = None
primary_key: str = None
async service_details(session, proxy)

Returns high-level details for the data as JSON.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • proxy (str) – Proxy string that is passed to _fetch() method.

Returns:

The service details as a JSON object.

Return type:

Dict[str, Any]

async service_metadata(session, proxy)

Returns metadata as JSON.

Parameters:
  • session (aiohttp.ClientSession) – The aiohttp session object.

  • proxy (str) – Proxy string that is passed to _fetch() method.

Returns:

The metadata as a JSON object.

Return type:

Dict[str, Any]

tables: List[Dict[str, Any]] = None
type: str = None
url: str = None
wfsservers()

Self-filtering method. Currently unused.

Returns:

Self if type is ‘WFSServer’ else None.

Return type:

Service