Source code for contrail_api_cli.command

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import sys
import inspect
import argparse
import abc
from fnmatch import fnmatch
from collections import OrderedDict
from six import add_metaclass, text_type

from .resource import Resource
from .resource import Collection, RootCollection
from .schema import ResourceNotDefined
from .utils import Path, classproperty, parallel_map
from .exceptions import CommandError, NotFound
from .context import Context


class ArgumentParser(argparse.ArgumentParser):

    def exit(self, status=0, message=None):
        raise CommandError(message or '')


class BaseOption(object):
    _creation_idx = 0

    def __init__(self, *args, **kwargs):
        self.attr = ''
        self.complete = None
        if 'complete' in kwargs:
            self.complete = kwargs.pop('complete')
        self.kwargs = kwargs
        # keep track of options order
        self._creation_idx = BaseOption._creation_idx
        BaseOption._creation_idx += 1

    @property
    def help(self):
        return self.kwargs.get('help', '') % self.kwargs

    @property
    def dest(self):
        return self.kwargs.get('dest', self.attr)

    @property
    def is_multiple(self):
        return self.kwargs.get('nargs') in ('*', '+') or \
            self.kwargs.get('action') in ('append',)

    @property
    def nargs(self):
        if self.kwargs.get('nargs') == '?':
            return 1
        return self.kwargs.get('nargs', 1)

    def __repr__(self):
        return '%s(%s)' % (self.__class__.__name__, self.attr)


class Option(BaseOption):

    def __init__(self, short_name=None, **kwargs):
        BaseOption.__init__(self, **kwargs)
        self.short_name = short_name

    @property
    def need_value(self):
        return self.kwargs.get('action') not in ('store_true', 'store_false')

    @property
    def long_name(self):
        return '--%s' % self.attr.replace('_', '-')

    @property
    def option_strings(self):
        return [n for n in (self.long_name, self.short_name) if n is not None]


class Arg(BaseOption):
    pass


def experimental(cls):
    old_call = cls.__call__

    def new_call(self, *args, **kwargs):
        print("This command is experimental. Use at your own risk.")
        old_call(self, *args, **kwargs)

    cls.__call__ = new_call
    return cls


def _path_to_resources(path, predicate=None, filters=None, parent_uuid=None):
    if any([c in text_type(path) for c in ('*', '?')]):
        if any([c in path.base for c in ('*', '?')]):
            col = RootCollection(fetch=True,
                                 filters=filters,
                                 parent_uuid=parent_uuid)
        else:
            col = Collection(path.base, fetch=True,
                             filters=filters,
                             parent_uuid=parent_uuid)
        for r in col:
            if predicate and not predicate(r):
                continue
            # list of paths to match against
            paths = [r.path,
                     Path('/', r.type, text_type(r.fq_name))]
            if any([fnmatch(text_type(p), text_type(path)) for p in paths]):
                yield r
    elif path.is_resource:
        if path.is_uuid:
            kwargs = {'uuid': path.name}
        else:
            kwargs = {'fq_name': path.name}
        try:
            r = Resource(path.base,
                         check=True,
                         **kwargs)
        except ResourceNotDefined as e:
            raise CommandError(text_type(e))
        if predicate and not predicate(r):
            raise StopIteration
        yield r
    elif path.is_collection:
        c = Collection(path.base,
                       filters=filters,
                       parent_uuid=parent_uuid)
        if predicate and not predicate(c):
            raise StopIteration
        yield c


def expand_paths(paths=None, predicate=None, filters=None, parent_uuid=None):
    """Return an unique list of resources or collections from a list of paths.
    Supports fq_name and wilcards resolution.

    >>> expand_paths(['virtual-network',
                      'floating-ip/2a0a54b4-a420-485e-8372-42f70a627ec9'])
    [Collection('virtual-network'),
     Resource('floating-ip', uuid='2a0a54b4-a420-485e-8372-42f70a627ec9')]

    :param paths: list of paths relative to the current path
                  that may contain wildcards (*, ?) or fq_names
    :type paths: [str]
    :param predicate: function to filter found resources
    :type predicate: f(resource) -> bool
    :param filters: list of filters for Collections
    :type filters: [(name, value), ...]
    :rtype: [Resource or Collection]
    :raises BadPath: path cannot be resolved
    """
    if not paths:
        paths = [Context().shell.current_path]
    else:
        paths = [Context().shell.current_path / res for res in paths]

    # use a dict to have unique paths
    # but keep them ordered
    result = OrderedDict()
    for res in parallel_map(_path_to_resources, paths,
                            kwargs={'predicate': predicate,
                                    'filters': filters,
                                    'parent_uuid': parent_uuid},
                            workers=50):
        for r in res:
            result[r.path] = r

    resources = list(result.values())
    if not resources:
        raise NotFound()
    return resources


[docs]@add_metaclass(abc.ABCMeta) class Command(object): """Base class for commands """ description = "" """Description of the command""" aliases = [] """Command aliases""" _options = None _args = None def __init__(self, name): self.parser = ArgumentParser(prog=name, description=self.description) self.add_arguments_to_parser(self.parser) self._is_piped = False
[docs] def current_path(self, resource): """Return current path for resource :param resource: resource or collection :type resource: Resource|Collection :rtype: str """ return text_type(resource.path.relative_to(Context().shell.current_path))
@property def is_piped(self): """Return True if the command result is beeing piped to another command. :rtype: bool """ return not sys.stdout.isatty() or self._is_piped @is_piped.setter def is_piped(self, value): self._is_piped = value @classproperty def options(cls): if cls._options is not None: return cls._options cls._options = OrderedDict() for attr, option in sorted( inspect.getmembers(cls, lambda o: isinstance(o, Option)), key=lambda i: i[1]._creation_idx): option.attr = attr cls._options[text_type(attr)] = option return cls._options @classproperty def args(cls): if cls._args is not None: return cls._args cls._args = OrderedDict() for attr, arg in sorted( inspect.getmembers(cls, lambda o: isinstance(o, Arg)), key=lambda i: i[1]._creation_idx): arg.attr = attr cls._args[text_type(attr)] = arg return cls._args @classmethod def add_arguments_to_parser(cls, parser): for (arg_name, arg) in cls.args.items(): parser.add_argument(arg_name, **arg.kwargs) for (option_name, option) in cls.options.items(): parser.add_argument(*option.option_strings, **option.kwargs) def parse_and_call(self, *args): args = self.parser.parse_args(args=args) return self.__call__(**vars(args))
[docs] @abc.abstractmethod def __call__(self, **kwargs): """Command must implement this method. The command must return an unicode string (unicode in python2 or str in python3) :param kwargs: options of the command :rtype: unicode | str """