Source code for sdss_brain.mixins.access

# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Filename: access.py
# Project: sdss_brain
# Author: Brian Cherinka
# Created: Friday, 2nd October 2020 3:24:02 pm
# License: BSD 3-clause "New" or "Revised" License
# Copyright (c) 2020 Brian Cherinka
# Last Modified: Friday, 2nd October 2020 3:24:02 pm
# Modified By: Brian Cherinka


from __future__ import print_function, division, absolute_import
import abc
import time
import warnings
from functools import wraps
from typing import Type

from sdss_brain import log
from sdss_brain.exceptions import BrainError, BrainMissingDependency, BrainUserWarning
from sdss_brain.config import config

try:
    from sdss_access import Access
except ImportError:
    Access = None

__all__ = ['AccessMixIn']


def create_new_access(release: str) -> Type[Access]:
    ''' create a new sdss_access instance

    Parameters
    ----------
        release : str
            The sdss data release
    '''
    # check for public release
    is_public = 'DR' in release
    rsync_release = release.lower() if is_public else None

    if not Access:
        raise BrainMissingDependency('sdss_access is not installed')

    return Access(public=is_public, release=rsync_release)


def set_access(func):
    ''' Decorator that sets the _access attribute

    Creates a new sdss_access instance if either the _access
    attribute is None or the object release differs from the access
    release.  Ensures that a new sdss_access.Access is instantiated
    when we change releases, e.g. between public DRs or work releases.

    '''
    @wraps(func)
    def wrapper(*args, **kwargs):
        inst = args[0]
        isset = inst._access is not None
        # see if the instance release is different than the access release
        if 'work' in inst.release.lower():
            diffrelease = 'sdsswork' != inst._access.release if isset else None
        else:
            diffrelease = inst.release.lower() != inst._access.release if isset else None
        if not isset or diffrelease:
            inst._access = create_new_access(inst.release)
        return func(*args, **kwargs)
    return wrapper


def check_access_params(func):
    '''Decorator that checks for correct output from set_access_path_params '''

    @wraps(func)
    def wrapper(*args, **kwargs):
        inst = args[0]

        if kwargs.get('force_file', None):
            return func(*args, **kwargs)

        assert hasattr(
            inst, 'path_name'), f'{inst.__class__.__name__} must have a "path_name" class attribute'
        assert hasattr(
            inst, 'path_params'), 'set_access_path_params must set a "path_params" attribute'
        assert getattr(inst, 'path_name'), 'the path_name attribute cannot be None'
        assert getattr(inst, 'path_params'), 'the path_params attribute cannot be None'
        assert type(inst.path_params) == dict, 'the path_params attribute must be a dictionary'
        if inst.filename is None and not all(inst.path_params.values()):
            warnings.warn('Not all path_params are set.  Check how path_params are set '
                          'or for a mismatch between path_params and any extracted parameters '
                          'from _parse_input.  Ensuring any None path_params are set as strings')
            inst.path_params = dict(zip(inst.path_params.keys(), map(str, inst.path_params.values())))
        return func(*args, **kwargs)
    return wrapper


[docs]class AccessMixIn(abc.ABC): ''' Mixin for implementing multi-modal data access This is a class that adds support for dynamic path operations using `sdss_access`. Given a template path name and a defined set of template keyword argument, provides convenience methods for constructing the full local or url-based pathname, downloading the file with `sdss_access`. Also provides the complete `~sdss_access.sync.access.Access` object as a property for full range of functionality. The ``access`` property automatically reconfigures itself according to the specified data release on each call. This mixin contains one abstractmethod you must override when subclassing. - **_set_access_path_params**: sets the arguments needed by `sdss_access` Parameters ---------- release : str The data release of the object, e.g. "DR16" Attributes ---------- path_name : str The `sdss_access` template path name path_params : dict The set of `sdss_access` template path keyword arguments access : ~sdss_access.sync.access.Access An instance of `sdss_access` using for all path creation and file downloads ''' path_name: str = None def __init__(self, *args: str, **kwargs: str): self._release = kwargs.get('release', None) or config.release # sdss_access attributes self._access = None self.path_params = None self._setup_access() super().__init__(*args, **kwargs) @property @set_access def access(self) -> Type[Access]: ''' Returns an instance of `~sdss_access.sync.access.Access` ''' return self._access @abc.abstractmethod def _set_access_path_params(self) -> None: ''' Return the sdss_access path parameters This method must be overridden by each subclass and must set at least one parameter, "path_params", which specify parameters to be passed to sdss_access. "path_name" must also be specified as a class attribute. Attributes ---------- path_name : str Required. The sdss_access template path key name. Must be set on the class. path_params : dict Required. The keywords needed to fill out the sdss_access template path '''
[docs] @check_access_params def get_full_path(self, url: str = None, force_file: bool = None) -> str: """ Returns the full path of the file in the tree. Parameters ---------- url : bool If True, specifies the url location rather than the local file location force_file : bool If True, explicitly returns any set filename attribute instead of constructing a path from keyword arguments. Returns ------- fullpath : str The full path as built by sdss_access """ if force_file: return self.filename log.debug(f'getting full path for {self.path_name} and params {self.path_params}') msg = 'sdss_access was not able to retrieve the full path of the file.' fullpath = None try: if url: fullpath = self.access.url(self.path_name, **self.path_params) else: fullpath = self.access.full(self.path_name, **self.path_params) except TypeError as ee: warnings.warn(msg + 'Error: {0}'.format(str(ee)), BrainUserWarning) raise BrainError(f'Bad input type for sdss_access: {ee}') from ee except Exception as ee: warnings.warn(msg + 'Error: {0}'.format(str(ee)), BrainUserWarning) return fullpath
def _setup_access(self, params: dict = None, origin: str = None) -> None: ''' Set up the initial access parameters Sets up an initial default path_params dictionary. Given a provided `path_name` class attribute, looks up the path keyword arguments and creates instance properties, as well as a default `path_params` dictionary. If "params" is specified then properties and `path_params` is updated from that input. Parameters ---------- params : dict A dictionary of access path params origin : str Indicates the origin of the content calling this method. Either "file" or "object" ''' assert origin in [None, 'file', 'object'], 'origin can only be file or object' # do nothing if no path_name set if not hasattr(self, 'path_name') or not self.path_name: return # look up the access keys and create attributes keys = self.access.lookup_keys(self.path_name) log.debug(f"setting up initial access keys for {keys} for {self.path_name}") for k in keys: # look up a possible work version work_ver = self._version.get(k, None) vmsg = ('Version extracted from file is different than your preset "work" ' f'version for {k}. Consider updating the configured work version or ' 'specifying an input version.') # look for a default value default = self._path_defaults.get(k, None) if hasattr( self, '_path_defaults') else None # set the work version as default if no default found if not default and work_ver and origin == 'object': default = work_ver # get the attribute value to set if params: if type(params) != dict: raise TypeError('the path_params attribute must be a dictionary') # check if work_ver should supersede the path_param if origin == 'object' and work_ver: params[k] = work_ver attr_value = params.get(k, default) else: attr_value = default # skip if a class attribute already exists if hasattr(self.__class__, k): # but first check the work version and issue a warning if there mismatch is found if work_ver and origin == 'file' and work_ver != attr_value: warnings.warn(vmsg) continue # set attributes on the instance setattr(self, k, attr_value) # issue a warning if the preset work version mismatches from the one # extracted from the file if origin == 'file' and work_ver and work_ver != getattr(self, k, None): warnings.warn(vmsg) # create a default path params dictionary self.path_params = {k: getattr(self, k) for k in keys}
[docs] @check_access_params def download(self) -> None: """ Download the file using sdss_access """ self.access.remote() self.access.add(self.path_name, **self.path_params) self.access.set_stream() self.access.commit() paths = self.access.get_paths() # adding a millisecond pause for download to finish and file existence to register time.sleep(0.001) self.filename = paths[0] # doing this for single files, may need to change