Source code for b3j0f.aop.advice

# -*- coding: utf-8 -*-

# --------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2014 Jonathan Labéjof <jonathan.labejof@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --------------------------------------------------------------------

"""Provides functions in order to weave/unweave/get advices from callable
objects."""

from re import compile as re_compile

from uuid import uuid4 as uuid

from inspect import getmembers, isroutine, isclass

from opcode import opmap

try:
    from threading import Timer
except ImportError:
    from dummy_threading import Timer

from .joinpoint import (
    _unapply_interception, is_intercepted, _get_function, Joinpoint, find_ctx,
    super_method, get_intercepted, base_ctx
)

from six import string_types, callable

__all__ = [
    'AdviceError', 'get_advices', 'weave', 'unweave', 'weave_on', 'Advice'
]

# consts for interception loading
LOAD_GLOBAL = opmap['LOAD_GLOBAL']
LOAD_CONST = opmap['LOAD_CONST']

WRAPPER_ASSIGNMENTS = ('__doc__', '__annotations__', '__dict__', '__module__')

_ADVICES = '_advices'  #: target advices attribute name


[docs]class AdviceError(Exception): """Handle Advice errors."""
class _Joinpoint(Joinpoint): """Manage target execution with Advices. Advices are callable objects which take in parameter a Joinpoint. Joinpoint provides to advices: - the target, - target call arguments as args and kwargs property, - a shared context during interception such as a dictionary. """ def get_advices(self, target): result = get_advices(target, ctx=self.ctx) return result def _add_advices(target, advices): """Add advices on input target. :param Callable target: target from where add advices. :param advices: advices to weave on input target. :type advices: routine or list. :param bool ordered: ensure advices to add will be done in input order """ interception_fn = _get_function(target) target_advices = getattr(interception_fn, _ADVICES, []) target_advices += advices setattr(interception_fn, _ADVICES, target_advices) def _remove_advices(target, advices, ctx): """Remove advices from input target. :param advices: advices to remove. If None, remove all advices. """ # if ctx is not None if ctx is not None: # check if intercepted ctx is ctx _, intercepted_ctx = get_intercepted(target) if intercepted_ctx is None or intercepted_ctx is not ctx: return interception_fn = _get_function(target) target_advices = getattr(interception_fn, _ADVICES, None) if target_advices is not None: if advices is None: target_advices = [] else: target_advices = [ advice for advice in target_advices if advice not in advices ] if target_advices: # update target advices setattr(interception_fn, _ADVICES, target_advices) else: # free target advices if necessary delattr(interception_fn, _ADVICES) _unapply_interception(target, ctx=ctx)
[docs]def get_advices(target, ctx=None, local=False): """Get element advices. :param target: target from where get advices. :param ctx: ctx from where get target. :param bool local: If ctx is not None or target is a method, if True (False by default) get only target advices without resolving super target advices in a super ctx. :return: list of advices. :rtype: list """ result = [] if is_intercepted(target): # find ctx if not given if ctx is None: ctx = find_ctx(target) # find advices among super ctx if same intercepted/interception if ctx is not None: # get target name target_name = target.__name__ # resolve target and _target_ctx _target_ctx = ctx _target = getattr(_target_ctx, target_name, None) # check if _target is intercepted if is_intercepted(_target): # get intercepted_target in order to compare with super targets intercepted_target, intercepted_ctx = get_intercepted(_target) # if ctx is not a class if not isclass(_target_ctx): if intercepted_ctx is _target_ctx: interception_fn = _get_function(_target) advices = getattr(interception_fn, _ADVICES, []) result += advices _target_ctx = _target_ctx.__class__ _target = getattr(_target_ctx, target_name, None) # climb back class hierarchy tree through all super targets while _target is not None and _target_ctx is not None: # check if _target is intercepted if is_intercepted(_target): # get intercepted ctx intercepted_fn, intercepted_ctx = get_intercepted( _target ) # if intercepted ctx is ctx if intercepted_ctx is _target_ctx: # get advices from _target interception interception_fn = _get_function(_target) advices = getattr(interception_fn, _ADVICES, []) result += advices # update _target _target, _target_ctx = super_method( name=target_name, ctx=_target_ctx ) else: # else _target_ctx is intercepted_ctx _target_ctx = intercepted_ctx # and update _target _target = getattr(_target_ctx, target_name, None) else: # else, intercepted_fn is _target function intercepted_fn = _get_function(_target) _target, _target_ctx = super_method( name=target_name, ctx=_target_ctx ) # if intercepted are different, stop iteration if intercepted_target is not intercepted_fn: break if local: # break if local has been requested break else: # get advices from interception function interception_function = _get_function(target) result = getattr(interception_function, _ADVICES, []) return result
def _namematcher(regex): """Checks if a target name matches with an input regular expression.""" matcher = re_compile(regex) def match(target): target_name = getattr(target, '__name__', '') result = matcher.match(target_name) return result return match def _publiccallable(target): """ :return: True iif target is callable and name does not start with '_' """ result = ( callable(target) and not getattr(target, '__name__', '').startswith('_') ) return result
[docs]def weave( target, advices, pointcut=None, ctx=None, depth=1, public=False, pointcut_application=None, ttl=None ): """Weave advices on target with input pointcut. :param callable target: target from where checking pointcut and weaving advices. :param advices: advices to weave on target. :param ctx: target ctx (class or instance). :param pointcut: condition for weaving advices on joinpointe. The condition depends on its type. :type pointcut: - NoneType: advices are weaved on target. - str: target name is compared to pointcut regex. - function: called with target in parameter, if True, advices will be weaved on target. :param int depth: class weaving depthing. :param bool public: (default True) weave only on public members. :param routine pointcut_application: routine which applies a pointcut when required. _Joinpoint().apply_pointcut by default. Such routine has to take in parameters a routine called target and its related function called function. Its result is the interception function. :param float ttl: time to leave for weaved advices. :return: the intercepted functions created from input target or a tuple with intercepted functions and ttl timer. :rtype: list :raises: AdviceError if pointcut is not None, not callable neither a str. """ result = [] # initialize advices if isroutine(advices): advices = [advices] if advices: # initialize pointcut # do nothing if pointcut is None or is callable if pointcut is None or callable(pointcut): pass # in case of str, use a name matcher elif isinstance(pointcut, string_types): pointcut = _namematcher(pointcut) else: error_msg = "Wrong pointcut to check weaving on {0}." error_msg = error_msg.format(target) advice_msg = "Must be None, or be a str or a function/method." right_msg = "Not {0}".format(type(pointcut)) raise AdviceError( "{0} {1} {2}".format(error_msg, advice_msg, right_msg) ) if ctx is None: ctx = find_ctx(elt=target) _weave( target=target, advices=advices, pointcut=pointcut, depth=depth, depth_predicate=_publiccallable if public else callable, ctx=ctx, intercepted=result, pointcut_application=pointcut_application ) if ttl is not None: kwargs = { 'target': target, 'advices': advices, 'pointcut': pointcut, 'depth': depth, 'public': public, 'ctx': ctx } timer = Timer(ttl, unweave, kwargs=kwargs) timer.start() result = result, timer return result
def _weave( target, advices, pointcut, ctx, depth, depth_predicate, intercepted, pointcut_application ): """Weave deeply advices in target. :param callable target: target from where checking pointcut and weaving advices. :param advices: advices to weave on target. :param ctx: target ctx (class or instance). :param pointcut: condition for weaving advices on joinpointe. The condition depends on its type. :type pointcut: - NoneType: advices are weaved on target. - str: target name is compared to pointcut regex. - function: called with target in parameter, if True, advices will be weaved on target. :param int depth: class weaving depthing. :param list intercepted: list of intercepted targets. :param routine pointcut_application: routine which applies a pointcut when required. _Joinpoint().apply_pointcut by default. Such routine has to take in parameters a routine called target and its related function called function. Its result is the interception function. """ # if weaving has to be done if pointcut is None or pointcut(target): # get target interception function interception_fn = _get_function(target) # does not handle not python functions if interception_fn is not None: # flag which specifies if poincut has to by applied # True if target is not intercepted apply_poincut = not is_intercepted(target) # apply poincut if not intercepted if (not apply_poincut) and ctx is not None: # apply poincut if ctx is not intercepted_ctx intercepted_fn, intercepted_ctx = get_intercepted(target) # if previous weave was done directly on the function if intercepted_ctx is None: # update intercepted_ctx on target intercepted_ctx = interception_fn._intercepted_ctx = ctx # if old ctx and the new one are same if ctx is not intercepted_ctx: # apply pointcut apply_poincut = True # and update interception_fn interception_fn = intercepted_fn # if weave has to be done if apply_poincut: # instantiate a new joinpoint if pointcut_application is None if pointcut_application is None: pointcut_application = _Joinpoint().apply_pointcut interception_fn = pointcut_application( target=target, function=interception_fn, ctx=ctx ) # add advices to the interception function _add_advices( target=interception_fn, advices=advices ) # append interception function to the intercepted ones intercepted.append(interception_fn) # search inside the target elif depth > 0: # for an object or a class, weave on methods # get the right ctx if ctx is None: ctx = target for _, member in getmembers(ctx, depth_predicate): _weave( target=member, advices=advices, pointcut=pointcut, depth_predicate=depth_predicate, intercepted=intercepted, pointcut_application=pointcut_application, depth=depth - 1, ctx=ctx )
[docs]def unweave( target, advices=None, pointcut=None, ctx=None, depth=1, public=False, ): """Unweave advices on target with input pointcut. :param callable target: target from where checking pointcut and weaving advices. :param pointcut: condition for weaving advices on joinpointe. The condition depends on its type. :type pointcut: - NoneType: advices are weaved on target. - str: target name is compared to pointcut regex. - function: called with target in parameter, if True, advices will be weaved on target. :param ctx: target ctx (class or instance). :param int depth: class weaving depthing. :param bool public: (default True) weave only on public members :return: the intercepted functions created from input target. """ # ensure advices is a list if not None if advices is not None: if isroutine(advices): advices = [advices] # initialize pointcut # do nothing if pointcut is None or is callable if pointcut is None or callable(pointcut): pass # in case of str, use a name matcher elif isinstance(pointcut, string_types): pointcut = _namematcher(pointcut) else: error_msg = "Wrong pointcut to check weaving on {0}.".format(target) advice_msg = "Must be None, or be a str or a function/method." right_msg = "Not {0}".format(type(pointcut)) raise AdviceError( "{0} {1} {2}".format(error_msg, advice_msg, right_msg) ) # get the right ctx if ctx is None: ctx = find_ctx(target) _unweave( target=target, advices=advices, pointcut=pointcut, ctx=ctx, depth=depth, depth_predicate=_publiccallable if public else callable )
def _unweave(target, advices, pointcut, ctx, depth, depth_predicate): """Unweave deeply advices in target.""" # if weaving has to be done if pointcut is None or pointcut(target): # do something only if target is intercepted if is_intercepted(target): _remove_advices(target=target, advices=advices, ctx=ctx) # search inside the target if depth > 0: # for an object or a class, weave on methods # get base ctx _base_ctx = None if ctx is not None: _base_ctx = base_ctx(ctx) for _, member in getmembers(target, depth_predicate): _unweave( target=member, advices=advices, pointcut=pointcut, depth=depth - 1, depth_predicate=depth_predicate, ctx=_base_ctx )
[docs]def weave_on(advices, pointcut=None, ctx=None, depth=1, ttl=None): """Decorator for weaving advices on a callable target. :param pointcut: condition for weaving advices on joinpointe. The condition depends on its type. :param ctx: target ctx (instance or class). :type pointcut: - NoneType: advices are weaved on target. - str: target name is compared to pointcut regex. - function: called with target in parameter, if True, advices will be weaved on target. :param depth: class weaving depthing :type depth: int :param public: (default True) weave only on public members :type public: bool """ def __weave(target): """Internal weave function.""" weave( target=target, advices=advices, pointcut=pointcut, ctx=ctx, depth=depth, ttl=ttl ) return target return __weave
[docs]class Advice(object): """Advice class which aims to embed an advice function with disabling property. """ __slots__ = ('_impl', '_enable', '_uid') def __init__(self, impl, uid=None, enable=True): self._impl = impl self._enable = enable self._uid = uuid() if uid is None else uid @property def uid(self): """Get advice uid.""" return self._uid @property def enable(self): """Get self enable state. Change state if input enable is a boolean. TODO: change of method execution instead of saving a state. """ return self._enable @enable.setter def enable(self, value): """Change of enable status.""" self._enable = value
[docs] def apply(self, joinpoint): """Apply this advice on input joinpoint. TODO: improve with internal methods instead of conditional test. """ if self._enable: result = self._impl(joinpoint) else: result = joinpoint.proceed() return result
@staticmethod
[docs] def set_enable(target, enable=True, advice_ids=None): """Enable or disable all target Advices designated by input advice_ids. If advice_ids is None, apply (dis|en)able state to all advices. """ advices = get_advices(target) for advice in advices: try: if isinstance(Advice) \ and (advice_ids is None or advice.uuid in advice_ids): advice.enable = enable except ValueError: pass
@staticmethod
[docs] def weave(target, advices, pointcut=None, depth=1, public=False): """Weave advices such as Advice objects.""" advices = ( advice if isinstance(advice, Advice) else Advice(advice) for advice in advices ) weave( target=target, advices=advices, pointcut=pointcut, depth=depth, public=public )
@staticmethod
[docs] def unweave(target, *advices): """Unweave advices from input target.""" advices = ( advice if isinstance(advice, Advice) else Advice(advice) for advice in advices ) unweave(target=target, *advices)
def __call__(self, joinpoint): """Shortcut for self apply.""" return self.apply(joinpoint) def __hash__(self): """Return self uid hash.""" result = hash(self._uid) return result def __eq__(self, other): """Compare with self uid.""" result = isinstance(other, Advice) and other.uid == self._uid return result