Source code for sdss_brain.api.manager

# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Filename: manager.py
# Project: api
# Author: Brian Cherinka
# Created: Friday, 23rd October 2020 3:08:17 pm
# License: BSD 3-clause "New" or "Revised" License
# Copyright (c) 2020 Brian Cherinka
# Last Modified: Friday, 23rd October 2020 3:08:18 pm
# Modified By: Brian Cherinka


from __future__ import print_function, division, absolute_import

import os
import pathlib
import warnings
import yaml
from astropy.table import Table
from functools import wraps
from pydantic import BaseModel, validator, parse_obj_as
from typing import List, Dict
from urllib.parse import urlparse, urlunparse
from sdss_brain import log, cfg_params
from sdss_brain.auth import User
from sdss_brain.api.io import send_post_request
from sdss_brain.exceptions import BrainError

try:
    from tabulate import tabulate
except ImportError:
    tabulate = None


__all__ = ['Domain', 'ApiProfileModel', 'ApiProfile', 'ApiManager', 'apim']


def urljoin(url1: str, url2: str) -> str:
    """ Custom function to join two url paths

    Uses `~urllib.parse.urlparse` and `~urllib.parse.urlunparse`
    to join relevant segments of two urls.  Does not use `~urllib.parse.urljoin`
    as that replaces existing url path with a new path.

    Parameters
    ----------
    url1 : str
        The base url to join to
    url2 : str
        The url segment to join to url1

    Returns
    -------
    str
        A joined url
    """

    e = urlparse(url1)
    t = urlparse(url2)
    final = urlunparse(tuple(strjoin(*z) for z in zip(e, t)))
    return final


def strjoin(str1: str, str2: str) -> str:
    """ Joins two url strings ignoring a leading / """
    if not str2.startswith(str1):
        # use os.path instead of pathlib since it does not trim trailing slashes
        f = os.path.join(str1, str2.lstrip('/')) if str2 else str1
    else:
        f = str2
    return f


# read in the available domains / apis
with open((pathlib.Path(__file__).parent.parent / 'etc/api_profiles.yml').resolve(), 'r') as f:
    profiles = yaml.load(f.read(), Loader=yaml.SafeLoader)
    domains = profiles['domains']
    apis = profiles['apis']


[docs]class Domain(BaseModel): """ Pydantic class handling validation for SDSS domains """ name: str public: bool = False description: str = None
[docs] @validator('name') def check_domain_name(cls, value): if (value != 'localhost' and not value.endswith('sdss.org') and not value.endswith('sdss5.org') and not value.endswith('sdss.utah.edu')): raise ValueError(f'Domain name {value} does not fit format of "xxx.sdss.org" or "xxx.sdss.utah.edu"') return value
def __str__(self): return str(self.name) def __eq__(self, value): if type(value) is str: return value == self.name elif isinstance(value, Domain): return value is self
# validate and process the domains yaml section domains = dict(zip(domains.keys(), parse_obj_as(List[Domain], list(domains.values())))) def check_domain(func): """ Decorator that checks for correct the domain Checks that a given input domain is in the list of valid domains for the given API profile. Aso checks for the appropriate inputs when the domain is localhost, i.e. for an additional valid port or ngrokid input. Raises ValueErrors. """ @wraps(func) def wrapper(*args, **kwargs): inst, domain = args port = kwargs.get('port', None) ngrokid = kwargs.get('ngrokid', None) if domain not in inst._all_domains: raise ValueError( f'Input domain "{domain}" not a valid domain/mirror for API profile {inst.name}') if (port or ngrokid) and domain != 'local': raise ValueError('Domain must be local if a port or ngrokid is set!') if domain == 'local' and not (port or ngrokid): raise ValueError('A port or ngrokid must be specified when domain is local') return func(*args, **kwargs) return wrapper
[docs]class ApiProfileModel(BaseModel): """ Pydantic class handling validation for SDSS API profiles """ domains: List[str] base: str mirrors: List[str] = None stems: Dict[str, str] = {'test': 'test', 'public': 'public', 'affix': 'prefix'} api: bool = False routemap: str = None auth: Dict[str, str] = {'type': 'netrc', 'route': None} description: str = '' docs: str = None
[docs] @validator('domains', 'mirrors') def domains_in_list(cls, values): if not set(values).issubset(set(domains)): raise ValueError(f'Not all of the input domains/mirrors are in the list of domains.yml!') return values
[docs] @validator('stems') def allowed_affixes(cls, values): if {'test', 'affix'} - set(values.keys()): raise ValueError('stems dictionary must contain at least "test" and "affix" keys!') if values['affix'] not in ['prefix', 'suffix']: raise ValueError('affix value can only be "prefix" or "suffix"!') return values
[docs]class ApiProfile(object): """ Class representing an API profile This class provides an interface for a given SDSS API profile. It provides convenience methods for easily switching domain hosts for a given API, switching between production and development paths, and constructing full url route paths. It uses the `~urllib.parse.urlparse` URL scheme, "scheme://netloc/path;parameters?query#fragment". Parameters ---------- name : str The name of the API profile domain : str, optional The name of the domain to use, by default None port : int, optional The port used for localhost domains, by default None ngrokid : int, optional The ngrok id used for localhost domains, by default None test: bool, optional If True, use the development url, by default None Attributes ---------- description : str A description of the API documentation : str A url link to any documentation of the API auth_type : str The type of authentication needed for the API domains : dict The available domains this API can be accessed on mirrors : dict The available domains acting as mirrors url : str The current constructed base API url current_domain : str The current domain the API it set to use name : str The name of the API profile token: str The authenticated token, if any info : dict A dictionary of information extracted from Pydantic datamodel Raises ------ ValueError when input name does not match a valid API profile """ def __init__(self, name: str, domain: str = None, port: int = None, ngrokid: int = None, release: str = None, test: bool = None) -> None: self.name = name # load and validate the name from the list of API profiles if name not in apis: raise ValueError(f'API Profile {name} not in the apis.yml file. Consider adding it.') else: self._validated_model = ApiProfileModel.parse_obj(apis[name]) self.info = self._validated_model.dict() self.description = self.info.get('description', '') self.documentation = self.info.get('docs', '') # load the domains and mirrors self.domains = self._get_domains() self.mirrors = self._get_domains(mirror=True) self._all_domains = {**self.domains} if not self.mirrors else {**self.domains, **self.mirrors} # select the current domain domain = next(iter(self.domains)) if not domain else domain self._select_current_domain(domain, port=port, ngrokid=ngrokid) # construct the API url self.url = self.construct_url(test=test) # set auth type self._set_auth_type() def __repr__(self) -> str: return f'<ApiProfile("{self.name}", current_domain="{self.current_domain}", url="{self.url}")>' def __str__(self) -> str: return self.name.lower() def _set_auth_type(self) -> None: """ Sets the API authentication type """ if self.is_domain_public is True or 'local' in str(self.current_domain): self.auth_type = None else: self.auth_type = self.info['auth'].get('type', 'netrc') @property def token(self) -> str: """ Returns an authentication token """ return self.check_for_token()
[docs] def check_for_token(self) -> str: """ Checks for a proper auth token set as a envvar or in custom config """ token = f'{self.name.upper()}_API_TOKEN' return os.getenv(token) or cfg_params.get(token.lower(), None)
[docs] def construct_token_url(self) -> str: """ Construct a login url for requesting tokens """ # check the API auth_type auth = self.info.get('auth', None) if auth['type'] != 'token': log.info(f'Auth type for API {self.name} is not "token". No token needed.') return # get the token route for the given API token_route = auth.get('route', None) if not token_route: raise ValueError(f'No token route specified for API profile {self.name}. ' 'I do not where to request a token from.') return self.construct_route(token_route)
[docs] def get_token(self, user: str) -> str: """ Request and receive a valid API auth token Requests an auth token for the specified user. This uses found netrc authentication to attempt to request and retrieve a valid token. The token should be saved in an "XXX_API_TOKEN" environment variable or in the custom sdss_brain.yml configuration file as "xxx_api_token", where "XXX" is the API profile name. Parameters ---------- user : str The name of the SDSS user Returns ------- str A valid API auth token Raises ------ BrainError when the user is not netrc validated BrainError when a token cannot be extracted from the http response """ if self.token: return self.token auth = self.info.get('auth', None) if auth['type'] != 'token': log.info(f'Auth type for API {self.name} is not "token". No token needed.') return if type(user) == str: user = User(user) if not user.validated and not user.is_netrc_valid: raise BrainError(f'User {user.name} is not netrc validated! Cannot access credentials.') username, password = user.netrc.read_netrc('api.sdss.org') token_url = self.construct_token_url() data = send_post_request(token_url, data={'username': username, 'password': password}) token = data.get('token', data.get('access_token', data.get('user_token', data.get('sdss_token', None)))) if not token: raise BrainError('Token request successful but could not extract token ' 'from response data. Check the returned json response ' 'for prope key name') else: tok_name = f'{self.name.upper()}_API_TOKEN' log.info(f'Save this token as either a "{tok_name}" environment variable in your ' f'.bashrc or as "{tok_name.lower()}" in your custom sdss_brain.yml config file.') return token
@property def is_domain_public(self) -> bool: """ Checks if current domain is a public one """ return self.current_domain.public @check_domain def _select_current_domain(self, domain: str, port: int = None, ngrokid: int = None) -> None: if len(self._all_domains) == 1: warnings.warn(f'Only one domain available for API profile {self.name}. ' 'Selecting that one.') self.current_domain = list(self._all_domains.values())[0] return if domain == 'local': self.current_domain = self._create_local_domain(port, ngrokid) else: self.current_domain = self._all_domains.get(domain, None) def _get_domains(self, mirror: bool = None) -> dict: """ Get the subset of domains valid for the given API Set the domains subsets for the specific API. Sets the ``domains`` attribute and the ``mirrors`` attribute. Parameters ---------- mirror : bool, optional If True, looks for and sets any mirror domains, by default None Returns ------- dict The set of valid domains for the given API Raises ------ ValueError when the API profile has no domains entry set """ # get the API profile domains ; add localhost automatically key = 'mirrors' if mirror else 'domains' profile_domains = self.info.get(key, None) if key == 'domains': profile_domains.append('local') if not mirror and not profile_domains: raise ValueError(f'Profile {self.name} has no "domains" entry set!') elif mirror and not profile_domains: return None return {i: domains[i] for i in profile_domains}
[docs] def construct_url(self, test: bool = None, public: bool = None) -> str: """ Constructs a new base url Constructs a new url given the currently set domain name and the API base name. Can optionally specify a development base using the test or public keywords. Parameters ---------- test : bool, optional If True, add the "test" stem for the development path, by default None public : bool, optional If True, adds the "public" stem for the deveopment path, by default None Returns ------- str [description] """ scheme = 'http' if 'local' in self.current_domain else 'https' netloc = self.current_domain.name path = self._create_base_stem(test, public) return urlunparse((scheme, netloc, path, '', '', ''))
[docs] def construct_route(self, route: str) -> str: """ Construct a full url to an input route Constructs a full url path to the input route. Parameters ---------- route : str The route component url path Returns ------- str The full route url Example ------- >>> p = ApiProfile('marvin') >>> p.url 'https://sas.sdss.org/marvin/api' >>> p.construct_route('general/getroutemap') 'https://sas.sdss.org/marvin/api/general/getroutemap' """ return urljoin(self.url, route)
def _create_local_domain(self, port: int, ngrokid: int) -> str: """ Create a local domain name """ if ngrokid: domain = f'{ngrokid}.ngrok.io' else: domain = f'{self.domains["local"].name}:{port}' return domain
[docs] @check_domain def change_domain(self, domain: str, port: int = None, ngrokid: int = None) -> None: """ Change the url domain Updates the url "netloc" segment to use the domain name provided. If the domain is "local", also needs either a port number or ngrok id to fully construct a local domain name. Parameters ---------- domain : str The name of the domain to use port : int, optional The port used for localhost domains, by default None ngrokid : int, optional The ngrok id used for localhost domains, by default None Example ------- >>> p = ApiProfile('marvin') >>> p.url 'https://sas.sdss.org/marvin/api' >>> p.change_domains('dr15') >>> p.url 'https://dr15.sdss.org/marvin/api' """ parsed_url = urlparse(self.url) params = dict(zip(['scheme', 'netloc', 'path', 'params', 'query', 'fragment'], tuple(parsed_url))) if domain == 'local': params['scheme'] = 'http' params['netloc'] = self._create_local_domain(port, ngrokid) else: params['scheme'] = 'https' params['netloc'] = self._all_domains[domain].name self.url = urlunparse(params.values()) # reset current domain and auth_type self.current_domain = self._all_domains[domain] self._set_auth_type()
def _create_base_stem(self, test: bool, public: bool) -> str: """ Create a new path stem """ base = self.info['base'] api = self.info.get('api', False) path = f'{base}' if test or public: stems = self.info.get('stems', None) affix = stems.get('affix', 'prefix') testfix = stems.get('test', 'test') publicfix = stems.get('public', 'public') if test: path = f'{testfix}/{path}' if affix == 'prefix' else f'{path}/{testfix}' if public: path = f'{publicfix}/{path}' if affix == 'prefix' else f'{path}/{publicfix}' if api: path = f'{path}/api' return path
[docs] def change_path(self, test: bool = None, public: bool = None) -> None: """ Change the url path Updates the url "path" segment. Called without arguments, with update the path to the production base. If either "test" or "public" is set will update the path with the corresponding stems, used to switch to development urls. Parameters ---------- test : bool, optional If True, add the "test" stem for the development path, by default None public : bool, optional If True, adds the "public" stem for the deveopment path, by default None Example ------- >>> p = ApiProfile('marvin') >>> p.url 'https://sas.sdss.org/marvin/api' >>> p.change_path(test=True) >>> p.url 'https://sas.sdss.org/test/marvin/api' """ path = self._create_base_stem(test, public) parsed_url = urlparse(self.url) params = dict(zip(['scheme', 'netloc', 'path', 'params', 'query', 'fragment'], tuple(parsed_url))) params['path'] = path self.url = urlunparse(params.values())
[docs]class ApiManager(object): """ Class for managing SDSS APIs This class provides an interface for handling and managing the selection of various SDSS APIs. It allows toggling of the current API to use for all remote requests. Attributes ---------- domains : dict A dictionary of available SDSS domains apis : dict A dictionary of available SDSS APIs profile : Type[ApiProfile] The currently selected API to use """ def __init__(self) -> None: self.domains = domains self.apis = {a: ApiProfile(a) for a in apis.keys()} self.profile = None def __repr__(self) -> str: return (f'<ApiManager(current_api="{str(self.profile)}", n_domains="{len(self.domains)}", ' f'n_apis="{len(self.apis)}")>')
[docs] def list_apis(self) -> list: """ List the available SDSS APIs Displays the complete list of available SDSS APIs. Returns ------- list The list of available SDSS APIs """ return list(self.apis.values())
[docs] def list_domains(self) -> list: """ List the available SDSS domains Displays the complete list of available SDSS domain names. Returns ------- list The list of available SDSS domains """ return list(self.domains.values())
[docs] def set_profile(self, name: str, domain: str = None, test: bool = None) -> None: """ Set the current API profile Sets the current API to the named profile Parameters ---------- name : str The name of the API domain: str, optional The name of the domain to switch to, by default None test: bool, optional If True, sets the API profile to development, by default None """ if name not in self.apis: raise ValueError(f'Input profile {name} not an available SDSS API.') self.profile = self.apis.get(name, None) if domain: self.profile.change_domain(domain) if test: self.profile.change_path(test=test)
[docs] def identify_api_from_url(self, url: str) -> tuple: """ Identify and extract an API and domain type from a URL. Identifies the type of API and domain name the input URL is using. Loops over all available API and domain profiles in the ApiManager checks against extracted url parts from `~urllib.parse.urlparse`. Parameters ---------- url : str The full url string Returns ------- tuple The identified API profile and domain name Raises ------ ValueError when the url does not start with http """ if not url.startswith('http'): raise ValueError('Input url does not start with http.') api = domain = None # extract url parts parts = urlparse(url) # identify the API for v in self.apis.values(): if v.info['base'] in parts.path: api = v.name # identify the domain for k, d in self.domains.items(): if d.name == parts.netloc: domain = k return api, domain
[docs] def display(self, value: str, pprint: bool = False, show_docs: bool = True, show_desc: bool = True, **kwargs) -> Table: """ Display the APIs or domains as an Astropy Table Display the list of available SDSS APIs or domains as an Astropy Table. Parameters ---------- value : str Either "api(s)" or "domain(s)" pprint : bool, optional If True, pretty print the Astropy Table, by default False show_docs : bool, optional If True, include the "docs" column in the API table, by default True show_desc : bool, optional If True, include the "description" column in the API table, by default True kwargs: Any Other kwargs for Table.pprint (pretty print) method Returns ------- `~astropy.table.Table` An Astropy Table of information Raises ------ TypeError when the input value is not a string ValueError when the input value is not either "apis" or "domains" """ if type(value) != str: raise TypeError('Value can only be a string') value = value.lower() if value not in ['apis', 'api', 'domains', 'domain']: raise ValueError('Value can only be "apis" or "domains"!') rows = [] cols = [] if 'domain' in value: # create display columns cols = ['key', 'name', 'public', 'description'] # create table rows for k, v in self.domains.items(): row = v.dict() row['key'] = k rows.append(row) elif 'api' in value: # create display columns cols = ['key', 'base', 'description', 'domains', 'mirrors', 'auth', 'docs'] if not show_docs: cols.remove('docs') if not show_desc: cols.remove('description') # create table rows for k, v in self.apis.items(): row = {kk: vv for kk, vv in v.info.items() if kk in cols} row['domains'] = ', '.join(row['domains']) row['mirrors'] = ', '.join(row['mirrors']) if row['mirrors'] else '' row['auth'] = row['auth']['type'] row['key'] = k rows.append(row) table = Table(rows, names=cols) if pprint: table.pprint(**kwargs) return return table
[docs] def generate_rst_table(self, value: str, show_docs: bool = True, show_desc: bool = True, **kwargs) -> str: """ Generate a rst-formatted table Generates an rst-formatted table of the available domains or APIs using the `tabulate <https://github.com/astanin/python-tabulate>`_ python package. This method is good for dropping tables into Sphinx documentation. Parameters ---------- value : str Either "api(s)" or "domain(s)" show_docs : bool, optional If True, include the "docs" column in the API table, by default True show_desc : bool, optional If True, include the "description" column in the API table, by default True kwargs: Any Other kwargs for the tabulate method Returns ------- str The rst formatted table as a string Raises ------ ImportError when the tabulate package is not installed. """ if not tabulate: raise ImportError('package tabulate not found. Cannot generate rst table.') # create the table as a numpy array of data table = self.display(value, show_docs=show_docs, show_desc=show_desc).as_array() # convert the table into an rst table using tabulate rst = tabulate(table, headers='keys', tablefmt='rst', disable_numparse=True, **kwargs) return rst
apim = ApiManager()