raises(ValueError, 'c.cfgimpl_get_settings()[a] = ("test",)')
+def test_optiondescription_requires():
+ a = BoolOption('activate_service', '', True)
+ b = BoolOption('ip_address_service', '', multi=True)
+ a, b
+ OptionDescription('service', '', [b], requires=[{'option': a, 'expected': False, 'action': 'disabled'}])
+
+
+def test_optiondescription_requires_multi():
+ a = BoolOption('activate_service', '', True)
+ b = IPOption('ip_address_service', '', multi=True)
+ a, b
+ raises(ValueError, "OptionDescription('service', '', [a], requires=[{'option': b, 'expected': False, 'action': 'disabled'}])")
+
+
def test_properties_conflict():
a = BoolOption('activate_service', '', True)
a
s = SymLinkOption('s', u)
o = OptionDescription('o', '', [b, u, s])
o1 = OptionDescription('o1', '', [o])
- o1.impl_build_cache_consistency()
o1.impl_build_cache_option()
a = dumps(o1)
b4 = BoolOption("b4", "", callback=return_value, callback_params={'': ((None,),), 'value': ('string',)})
o = OptionDescription('o', '', [b, b2, b3, b4])
o1 = OptionDescription('o1', '', [o])
- o1.impl_build_cache_consistency()
o1.impl_build_cache_option()
a = dumps(o1)
def _impl_build_all_caches(self):
descr = self.cfgimpl_get_description()
if not descr.impl_already_build_caches():
- descr.impl_build_cache_consistency()
+ descr.impl_build_cache()
descr.impl_build_cache_option()
- descr.impl_validate_options()
def read_only(self):
"read only is a global config's setting, see `settings.py`"
allow_empty_list=undefined):
if not valid_name(name): # pragma: optional cover
raise ValueError(_("invalid name: {0} for option").format(name))
- if requires is not None:
- calc_properties, requires = validate_requires_arg(
- requires, name)
- else:
- calc_properties = frozenset()
- requires = undefined
if not multi and default_multi is not None: # pragma: optional cover
raise ValueError(_("default_multi is set whereas multi is False"
" in option: {0}").format(name))
if multi is True:
+ is_multi = True
_multi = 0
elif multi is False:
+ is_multi = False
_multi = 1
elif multi is submulti:
+ is_multi = True
_multi = submulti
else:
raise ValueError(_('invalid multi value'))
+ if requires is not None:
+ calc_properties, requires = validate_requires_arg(is_multi,
+ requires, name)
+ else:
+ calc_properties = frozenset()
+ requires = undefined
if properties is None:
properties = tuple()
if not isinstance(properties, tuple): # pragma: optional cover
allow_empty_list)
if multi is not False and default is None:
default = []
- self.impl_validate(default, is_multi=_multi != 1)
- self._set_default_values(default, default_multi, _multi != 1)
+ self.impl_validate(default, is_multi=is_multi)
+ self._set_default_values(default, default_multi, is_multi)
##callback is False in optiondescription
if callback is not False:
self.impl_set_callback(callback, callback_params, _init=True)
"is calculated").format(self.impl_getname()))
-def validate_requires_arg(requires, name):
+def validate_requires_arg(multi, requires, name):
"""check malformed requirements
and tranform dict to internal tuple
if not isinstance(option, Option): # pragma: optional cover
raise ValueError(_('malformed requirements '
'must be an option in option {0}').format(name))
+ if not multi and option.impl_is_multi():
+ raise ValueError(_('malformed requirements '
+ 'multi option must not set '
+ 'as requires of non multi option {0}').format(name))
if expected is not None:
try:
option._validate(expected)
del(self._stated)
except AttributeError: # pragma: optional cover
pass
- self._set_readonly()
+ self._set_readonly(True)
def impl_get_information(self, key, default=undefined):
return self._impl_getopt().impl_get_information(key, default)
_setattr(self, '_cache_consistencies', None)
# the group_type is useful for filtering OptionDescriptions in a config
_setattr(self, '_group_type', groups.default)
- _setattr(self, '_is_build_cache', False)
def impl_getdoc(self):
return self.impl_get_information('doc')
"""
return _impl_getpaths(self, include_groups, _currpath)
- def impl_build_cache_consistency(self, _consistencies=None, cache_option=None):
- if _consistencies is None:
+ def impl_build_cache(self, _consistencies=None, cache_option=None):
+ """validate duplicate option and set option has readonly option
+ """
+ if cache_option is None:
init = True
_consistencies = {}
cache_option = []
else:
init = False
for option in self._impl_getchildren(dyn=False):
+ #FIXME specifique id for sqlalchemy?
+ #FIXME avec sqlalchemy ca marche le multi parent ? (dans des configs différentes)
cache_option.append(option._get_id())
- if not isinstance(option, OptionDescription):
+ if isinstance(option, OptionDescription):
+ option._set_readonly(False)
+ option.impl_build_cache(_consistencies, cache_option)
+ #cannot set multi option as OptionDescription requires
+ else:
+ option._set_readonly(True)
for func, all_cons_opts, params in option._get_consistencies():
all_cons_opts[0]._valid_consistencies(all_cons_opts[1:])
for opt in all_cons_opts:
[]).append((func,
all_cons_opts,
params))
- else:
- option.impl_build_cache_consistency(_consistencies, cache_option)
- if init and _consistencies != {}:
- self._cache_consistencies = {}
- for opt, cons in _consistencies.items():
- if opt._get_id() not in cache_option: # pragma: optional cover
- raise ConfigError(_('consistency with option {0} '
- 'which is not in Config').format(
- opt.impl_getname()))
- self._cache_consistencies[opt] = tuple(cons)
-
- def impl_validate_options(self, cache_option=None):
- """validate duplicate option and set option has readonly option
- """
- if cache_option is None:
- init = True
- cache_option = []
- else:
- init = False
- for option in self._impl_getchildren(dyn=False):
- #FIXME specifique id for sqlalchemy?
- #FIXME avec sqlalchemy ca marche le multi parent ? (dans des configs différentes)
- oid = option._get_id()
- cache_option.append(oid)
- option._set_readonly()
- if isinstance(option, OptionDescription):
- option.impl_validate_options(cache_option)
- if option.impl_getrequires() != []:
- for requires in option.impl_getrequires():
- for require in requires:
- if require[0].impl_is_multi():
- if option.impl_is_master_slaves('slave') and require[0].impl_is_master_slaves():
- if option.impl_get_master_slaves() != require[0].impl_get_master_slaves():
- raise ValueError(_('malformed requirements option {0} '
- 'must be in same master/slaves for {1}').format(
- require[0].impl_getname(), option.impl_getname()))
- else:
- raise ValueError(_('malformed requirements option {0} '
- 'must not be a multi for {1}').format(
- require[0].impl_getname(), option.impl_getname()))
+ is_slave = None
+ if option.impl_is_multi():
+ all_requires = option.impl_getrequires()
+ if all_requires != tuple():
+ for requires in all_requires:
+ for require in requires:
+ #if option in require is a multi:
+ # * option in require must be a master or a slave
+ # * current option must be a slave (and only a slave)
+ # * option in require and current option must be in same master/slaves
+ require_opt = require[0]
+ if require_opt.impl_is_multi():
+ if is_slave is None:
+ is_slave = option.impl_is_master_slaves('slave')
+ if is_slave:
+ masterslaves = option.impl_get_master_slaves()
+ if is_slave and require_opt.impl_is_master_slaves():
+ if masterslaves != require_opt.impl_get_master_slaves():
+ raise ValueError(_('malformed requirements option {0} '
+ 'must be in same master/slaves for {1}').format(
+ require_opt.impl_getname(), option.impl_getname()))
+ else:
+ raise ValueError(_('malformed requirements option {0} '
+ 'must not be a multi for {1}').format(
+ require_opt.impl_getname(), option.impl_getname()))
if init:
if len(cache_option) != len(set(cache_option)):
for idx in xrange(1, len(cache_option) + 1):
opt = cache_option.pop(0)
if opt in cache_option:
raise ConflictError(_('duplicate option: {0}').format(opt))
- self._set_readonly()
+ if _consistencies != {}:
+ self._cache_consistencies = {}
+ for opt, cons in _consistencies.items():
+ if opt._get_id() not in cache_option: # pragma: optional cover
+ raise ConfigError(_('consistency with option {0} '
+ 'which is not in Config').format(
+ opt.impl_getname()))
+ self._cache_consistencies[opt] = tuple(cons)
+ self._set_readonly(False)
# ____________________________________________________________
def impl_set_group_type(self, group_type):
:param descr: parent :class:`tiramisu.option.OptionDescription`
"""
if descr is None:
- #FIXME faut le desactiver ?
- #self.impl_build_cache_consistency()
self.impl_build_cache_option()
descr = self
super(OptionDescription, self)._impl_getstate(descr)
from ...i18n import _
from ...setting import undefined
from ...error import ConfigError
+static_tuple = tuple()
+static_set = frozenset()
#____________________________________________________________
'_stated',
'_state_consistencies',
'_state_informations',
+ '_state_extra',
'_state_readonly',
'__weakref__'
)
"""
error = False
dico = self._informations
- if dico is None or isinstance(dico, str) or isinstance(dico, unicode):
+ if isinstance(dico, str) or isinstance(dico, unicode):
if key == 'doc':
return dico
if default is not undefined:
self._consistencies.pop(-1)
def _get_consistencies(self):
- try:
- return self._consistencies
- except AttributeError:
- return tuple()
+ return getattr(self, '_consistencies', static_tuple)
def _set_callback(self, callback, callback_params):
if callback_params is None or callback_params == {}:
return ret_call
def impl_get_calc_properties(self):
- try:
- return self._calc_properties
- except AttributeError:
- return frozenset()
+ return getattr(self, '_calc_properties', static_set)
def impl_getrequires(self):
- try:
- return self._requires
- except AttributeError:
- return []
+ return getattr(self, '_requires', static_tuple)
def _set_validator(self, validator, validator_params):
if validator_params is None:
def _impl_getopt(self):
return self._opt
- def _set_readonly(self):
+ def _set_readonly(self, has_extra):
if not self.impl_is_readonly():
- dico = self._informations
_setattr = object.__setattr__
- if not (dico is None or isinstance(dico, str) or isinstance(dico, unicode)):
- keys = tuple(dico.keys())
- if keys == ('doc',):
- dico = dico['doc']
- else:
- dico = tuple([tuple(dico.keys()), tuple(dico.values())])
- _setattr(self, '_informations', dico)
- try:
- extra = self._extra
- _setattr(self, '_extra', tuple([tuple(extra.keys()), tuple(extra.values())]))
- except AttributeError:
- pass
+ dico = self._informations
+ keys = tuple(dico.keys())
+ if len(keys) == 1:
+ dico = dico['doc']
+ else:
+ dico = tuple([keys, tuple(dico.values())])
+ _setattr(self, '_informations', dico)
+ if has_extra:
+ extra = getattr(self, '_extra', None)
+ if extra is not None:
+ _setattr(self, '_extra', tuple([tuple(extra.keys()), tuple(extra.values())]))
def _impl_setsubdyn(self, subdyn):
self._subdyn = subdyn
if isinstance(infos, tuple):
self._state_informations = {}
for idx, key in enumerate(infos[0]):
- value = infos[1][idx]
- self._state_informations[key] = value
+ self._state_informations[key] = infos[1][idx]
elif isinstance(infos, str) or isinstance(infos, unicode):
self._state_informations = {'doc': infos}
else:
except AttributeError:
pass
if self._state_readonly:
- self._set_readonly()
+ self._set_readonly(True)
del(self._state_readonly)
+ def _impl_convert_extra(self, descr, load=False):
+ if not load:
+ try:
+ extra = self._extra
+ if isinstance(extra, tuple):
+ self._state_extra = {}
+ for idx, key in enumerate(extra[0]):
+ self._state_extra[key] = extra[1][idx]
+ except AttributeError:
+ pass
+ else:
+ try:
+ self._extra = self._state_extra
+ del(self._state_extra)
+ except AttributeError:
+ pass
+
def _impl_getattributes(self):
slots = set()
for subclass in self.__class__.__mro__:
return self._name
def impl_is_multi(self):
- try:
- _multi = self._multi
- except AttributeError:
- return False
+ _multi = getattr(self, '_multi', 1)
return _multi != 1
def impl_is_submulti(self):
class StorageOptionDescription(StorageBase):
__slots__ = ('_children', '_cache_paths', '_cache_consistencies',
- '_group_type', '_is_build_cache', '_state_group_type')
+ '_group_type', '_state_group_type')
def __init__(self, name, multi, warnings_only, doc, extra):
super(StorageOptionDescription, self).__init__(name, multi,
warnings_only, doc,
None, undefined,
undefined, undefined)
- self._cache_paths = None
def _add_children(self, child_names, children):
_setattr = object.__setattr__
_setattr(self, '_children', (tuple(child_names), tuple(children)))
def impl_already_build_caches(self):
- return self._is_build_cache
+ return getattr(self, '_cache_paths', None) is not None
def impl_get_opt_by_path(self, path):
try:
raise AttributeError(_('no option for path {0}').format(path))
def impl_get_path_by_opt(self, opt):
- if self._cache_paths is None:
+ if getattr(self, '_cache_paths', None) is None:
raise ConfigError(_('use impl_get_path_by_opt only with root OptionDescription'))
try:
return self._cache_paths[1][self._cache_paths[0].index(opt)]
def impl_build_cache_option(self, _currpath=None, cache_path=None,
cache_option=None):
- _setattr = object.__setattr__
- try:
- self._cache_paths
- except AttributeError:
- _setattr(self, '_cache_paths', None)
- if _currpath is None and self._cache_paths is not None: # pragma: optional cover
+
+ if _currpath is None and getattr(self, '_cache_paths', None) is not None:
# cache already set
return
if _currpath is None:
cache_option)
_currpath.pop()
if save:
+ _setattr = object.__setattr__
_setattr(self, '_cache_paths', (tuple(cache_option), tuple(cache_path)))
- _setattr(self, '_is_build_cache', True)
def impl_get_options_paths(self, bytype, byname, _subpath, only_first, context):
find_results = []