i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
+ conf1 = Config(od2, 'conf1')
+ conf2 = Config(od2, 'conf2')
meta = GroupConfig([conf1, conf2])
w = weakref.ref(conf1)
del(conf1)
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
+ conf1 = Config(od2, 'conf1')
+ conf2 = Config(od2, 'conf2')
meta = MetaConfig([conf1, conf2])
w = weakref.ref(conf1)
del(conf1)
from py.test import raises
-from tiramisu.setting import owners
+from tiramisu.setting import groups, owners
from tiramisu.config import Config, GroupConfig, MetaConfig
-from tiramisu.option import IntOption, OptionDescription
-from tiramisu.error import ConfigError, PropertiesOptionError
+from tiramisu.option import IntOption, StrOption, OptionDescription
+from tiramisu.error import ConfigError, ConflictError
owners.addowner('meta')
i6 = IntOption('i6', '', properties=('disabled',))
od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
+ conf1 = Config(od2, name='conf1')
+ conf2 = Config(od2, name='conf2')
conf1.read_write()
conf2.read_write()
meta = MetaConfig([conf1, conf2])
conf1, conf2 = meta.cfgimpl_get_children()
assert conf1.od1.i2 == conf2.od1.i2 == 1
assert conf1.getowner(conf1.unwrap_from_path('od1.i2')) is conf2.getowner(conf2.unwrap_from_path('od1.i2')) is owners.default
- meta.setattrs('od1.i2', 6)
+ meta.set_value('od1.i2', 6, only_config=True)
assert meta.od1.i2 == 1
assert conf1.od1.i2 == conf2.od1.i2 == 6
assert conf1.getowner(conf1.unwrap_from_path('od1.i2')) is conf2.getowner(conf2.unwrap_from_path('od1.i2')) is owners.user
'od1.i2': 1, 'od1.i5': [2], 'od1.i6': None}
+def test_group_error():
+ raises(ValueError, "GroupConfig('str')")
+ raises(ValueError, "GroupConfig(['str'])")
+
+
def test_meta_meta():
meta1 = make_description()
meta2 = MetaConfig([meta1])
meta2 = MetaConfig([meta1])
meta2.cfgimpl_get_settings().setowner(owners.meta)
conf1, conf2 = meta1.cfgimpl_get_children()
- meta2.setattrs('od1.i1', 7)
+ meta2.set_value('od1.i1', 7, only_config=True)
#PropertiesOptionError
- meta2.setattrs('od1.i6', 7)
+ meta2.set_value('od1.i6', 7, only_config=True)
assert conf1.od1.i1 == conf2.od1.i1 == 7
assert conf1.getowner(conf1.unwrap_from_path('od1.i1')) is conf2.getowner(conf2.unwrap_from_path('od1.i1')) is owners.user
- assert [conf1, conf2] == meta2.find_firsts(byname='i1', byvalue=7, type_='config')
- assert ['od1.i1', 'od1.i1'] == meta2.find_firsts(byname='i1', byvalue=7, type_='path')
+ assert [conf1, conf2] == meta2.find_firsts(byname='i1', byvalue=7).cfgimpl_get_children()
conf1.od1.i1 = 8
- assert [conf1, conf2] == meta2.find_firsts(byname='i1', type_='config')
- assert ['od1.i1', 'od1.i1'] == meta2.find_firsts(byname='i1', type_='path')
- assert [conf2] == meta2.find_firsts(byname='i1', byvalue=7, type_='config')
- assert ['od1.i1'] == meta2.find_firsts(byname='i1', byvalue=7, type_='path')
- assert [conf1] == meta2.find_firsts(byname='i1', byvalue=8, type_='config')
- assert ['od1.i1'] == meta2.find_firsts(byname='i1', byvalue=8, type_='path')
- assert [conf1, conf2] == meta2.find_firsts(byname='i5', byvalue=2, type_='config')
- assert ['od1.i5', 'od1.i5'] == meta2.find_firsts(byname='i5', byvalue=2, type_='path')
+ assert [conf1, conf2] == meta2.find_firsts(byname='i1').cfgimpl_get_children()
+ assert [conf2] == meta2.find_firsts(byname='i1', byvalue=7).cfgimpl_get_children()
+ assert [conf1] == meta2.find_firsts(byname='i1', byvalue=8).cfgimpl_get_children()
+ assert [conf1, conf2] == meta2.find_firsts(byname='i5', byvalue=2).cfgimpl_get_children()
raises(AttributeError, "meta2.find_firsts(byname='i1', byvalue=10)")
raises(AttributeError, "meta2.find_firsts(byname='not', byvalue=10)")
raises(AttributeError, "meta2.find_firsts(byname='i6')")
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
+ conf1 = Config(od2, name='conf1')
+ conf2 = Config(od2, name='conf2')
+ conf3 = Config(od2)
+ conf4 = Config(od2, name='conf2')
raises(ValueError, "GroupConfig(conf1)")
- meta = GroupConfig([conf1, conf2])
- raises(ConfigError, 'meta.od1.i1')
- conf1, conf2 = meta.cfgimpl_get_children()
- meta.setattrs('od1.i1', 7)
- assert conf1.od1.i1 == conf2.od1.i1 == 7
- assert conf1.getowner(conf1.unwrap_from_path('od1.i1')) is conf2.getowner(conf2.unwrap_from_path('od1.i1')) is owners.user
+ #same name
+ raises(ConflictError, "GroupConfig([conf2, conf4])")
+ grp = GroupConfig([conf1, conf2])
+ raises(ConfigError, 'grp.od1.i1')
+ conf1, conf2 = grp.cfgimpl_get_children()
+ grp.set_value('od1.i1', 7)
+ assert grp.conf1.od1.i1 == conf2.od1.i1 == 7
+ assert grp.conf1.getowner(grp.conf1.unwrap_from_path('od1.i1')) is grp.conf2.getowner(grp.conf2.unwrap_from_path('od1.i1')) is owners.user
def test_group_find_firsts():
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
- meta = GroupConfig([conf1, conf2])
- conf1, conf2 = meta.find_firsts(byname='i1')
+ conf1 = Config(od2, name='conf1')
+ conf2 = Config(od2, name='conf2')
+ grp = GroupConfig([conf1, conf2])
+ assert [conf1, conf2] == grp.find_firsts(byname='i1').cfgimpl_get_children()
def test_meta_path():
i4 = IntOption('i4', '', default=2)
od1 = OptionDescription('od1', '', [i1, i2, i3, i4])
od2 = OptionDescription('od2', '', [od1])
- conf1 = Config(od2)
- conf2 = Config(od2)
- conf3 = Config(od2)
- conf4 = Config(od1)
+ conf1 = Config(od2, name='conf1')
+ conf2 = Config(od2, name='conf2')
+ conf3 = Config(od2, name='conf3')
+ conf4 = Config(od1, name='conf4')
meta = MetaConfig([conf1, conf2])
meta.cfgimpl_get_settings().setowner(owners.meta)
raises(TypeError, 'MetaConfig("string")')
+ #same descr but conf1 already in meta
raises(ValueError, "MetaConfig([conf1, conf3])")
+ #not same descr
raises(ValueError, "MetaConfig([conf3, conf4])")
+
+
+def test_meta_master_slaves():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf2 = Config(interface1, name='conf2')
+ meta = MetaConfig([conf1, conf2])
+ meta.conf1.read_only()
+ meta.conf2.read_only()
+ assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
+ meta.conf1.read_write()
+ meta.conf2.read_only()
+ assert [conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
+ meta.conf2.read_write()
+ raises(AttributeError, "meta.find_firsts(byname='netmask_admin_eth0')")
+ assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0',
+ check_properties=None).cfgimpl_get_children()
+ meta.conf1.read_only()
+ meta.conf2.read_only()
+ meta.read_write()
+ assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
+
+
+def test_meta_master_slaves_value():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf2 = Config(interface1, name='conf2')
+ meta = MetaConfig([conf1, conf2])
+ meta.conf1.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.netmask_admin_eth0 == [None]
+ del(meta.conf1.ip_admin_eth0)
+ assert meta.conf1.netmask_admin_eth0 == []
+ meta.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.netmask_admin_eth0 == [None]
+ meta.netmask_admin_eth0 = ['255.255.255.0']
+ assert meta.conf1.netmask_admin_eth0 == ['255.255.255.0']
+ meta.netmask_admin_eth0 = ['255.255.0.0']
+ assert meta.conf1.netmask_admin_eth0 == ['255.255.0.0']
+ meta.conf1.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.netmask_admin_eth0 == [None]
+
+
+def test_meta_master_slaves_owners():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf2 = Config(interface1, name='conf2')
+ meta = MetaConfig([conf1, conf2])
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.default
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
+ meta.conf1.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.user
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
+ del(meta.conf1.ip_admin_eth0)
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.default
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
+ meta.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
+ meta.netmask_admin_eth0 = ['255.255.255.0']
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.meta
+ meta.netmask_admin_eth0 = ['255.255.0.0']
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.meta
+ meta.conf1.ip_admin_eth0 = ['192.168.1.1']
+ assert meta.conf1.getowner(ip_admin_eth0) == owners.user
+ assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
+
+
+def test_meta_force_default():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf1.read_write()
+ conf2 = Config(interface1, name='conf2')
+ conf2.read_write()
+ meta = MetaConfig([conf1, conf2])
+ meta.read_write()
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ assert meta.ip_admin_eth0 == []
+ assert meta.conf1.ip_admin_eth0 == []
+ assert meta.conf2.ip_admin_eth0 == []
+ meta.set_value('ip_admin_eth0', ['192.168.1.1'])
+ assert meta.ip_admin_eth0 == ['192.168.1.1']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.1']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.1']
+ meta.conf1.ip_admin_eth0 = ['192.168.1.2']
+ assert meta.ip_admin_eth0 == ['192.168.1.1']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.2']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.1']
+ meta.set_value('ip_admin_eth0', ['192.168.1.3'])
+ assert meta.ip_admin_eth0 == ['192.168.1.3']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.2']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.3']
+ meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
+
+
+def test_meta_force_dont_change_value():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf1.read_write()
+ conf2 = Config(interface1, name='conf2')
+ conf2.read_write()
+ meta = MetaConfig([conf1, conf2])
+ meta.read_write()
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ assert meta.ip_admin_eth0 == []
+ assert meta.conf1.ip_admin_eth0 == []
+ assert meta.conf2.ip_admin_eth0 == []
+ meta.conf1.ip_admin_eth0 = ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.default
+ meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_dont_change_value=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.user
+
+
+def test_meta_force_default_if_same():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf1.read_write()
+ conf2 = Config(interface1, name='conf2')
+ conf2.read_write()
+ meta = MetaConfig([conf1, conf2])
+ meta.read_write()
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ #
+ assert meta.ip_admin_eth0 == []
+ assert meta.conf1.ip_admin_eth0 == []
+ assert meta.conf2.ip_admin_eth0 == []
+ #
+ meta.conf1.ip_admin_eth0 = ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.default
+ meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
+ assert conf1.getowner(ip_admin_eth0) is owners.meta
+ assert conf2.getowner(ip_admin_eth0) is owners.meta
+ #
+ meta.conf1.ip_admin_eth0 = ['192.168.1.3']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.meta
+ meta.set_value('ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.5']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
+ assert meta.conf2.ip_admin_eth0 == ['192.168.1.5']
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.meta
+
+
+def test_meta_force_default_if_same_and_dont_change():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf1.read_write()
+ conf2 = Config(interface1, name='conf2')
+ conf2.read_write()
+ meta = MetaConfig([conf1, conf2])
+ meta.read_write()
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ #
+ assert meta.ip_admin_eth0 == []
+ assert meta.conf1.ip_admin_eth0 == []
+ assert meta.conf2.ip_admin_eth0 == []
+ #
+ meta.conf1.ip_admin_eth0 = ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.default
+ meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True, force_dont_change_value=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.meta
+ assert conf2.getowner(ip_admin_eth0) is owners.user
+ #
+ meta.conf1.ip_admin_eth0 = ['192.168.1.3']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.user
+ meta.set_value('ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True, force_dont_change_value=True)
+ assert meta.ip_admin_eth0 == ['192.168.1.5']
+ assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
+ assert meta.conf2.ip_admin_eth0 == []
+ assert conf1.getowner(ip_admin_eth0) is owners.user
+ assert conf2.getowner(ip_admin_eth0) is owners.user
+
+
+def test_meta_force_default_and_dont_change():
+ ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
+ netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
+ interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
+ interface1.impl_set_group_type(groups.master)
+ conf1 = Config(interface1, name='conf1')
+ conf1.read_write()
+ conf2 = Config(interface1, name='conf2')
+ conf2.read_write()
+ meta = MetaConfig([conf1, conf2])
+ meta.read_write()
+ meta.cfgimpl_get_settings().setowner(owners.meta)
+ raises(ValueError, "meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default=True, force_dont_change_value=True)")
# ____________________________________________________________
"options handler global entry point"
import weakref
-from tiramisu.error import PropertiesOptionError, ConfigError
+from tiramisu.error import PropertiesOptionError, ConfigError, ConflictError
from tiramisu.option import OptionDescription, Option, SymLinkOption, \
DynSymLinkOption
+from tiramisu.option.baseoption import valid_name
from tiramisu.setting import groups, Settings, default_encoding, undefined
from tiramisu.storage import get_storages, get_storage, set_storage, \
_impl_getstate_setting
# ____________________________________________________________
class Config(_CommonConfig):
"main configuration management entry"
- __slots__ = ('__weakref__', '_impl_test')
+ __slots__ = ('__weakref__', '_impl_test', '_impl_name')
- def __init__(self, descr, session_id=None, persistent=False):
+ def __init__(self, descr, session_id=None, persistent=False,
+ name=undefined):
""" Configuration option management master class
:param descr: describes the configuration schema
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
+ if name is undefined:
+ name = 'config'
+ if session_id is not None:
+ name += session_id
+ if name is not None and not valid_name(name): # pragma: optional cover
+ raise ValueError(_("invalid name: {0} for config").format(name))
+ self._impl_name = name
def cfgimpl_reset_cache(self,
only_expired=False,
if 'settings' in only:
self.cfgimpl_get_settings().reset_cache(only_expired=only_expired)
+ def impl_getname(self):
+ return self._impl_name
+
+ def impl_setname(self, name):
+ self._impl_name = name
+
class GroupConfig(_CommonConfig):
- __slots__ = ('_impl_children', '__weakref__')
+ __slots__ = ('__weakref__', '_impl_children', '_impl_name')
def __init__(self, children, session_id=None, persistent=False,
- _descr=None):
+ _descr=None, name=undefined):
if not isinstance(children, list):
- raise ValueError(_("metaconfig's children must be a list"))
+ raise ValueError(_("groupconfig's children must be a list"))
+ names = []
+ for child in children:
+ if not isinstance(child, _CommonConfig):
+ raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig"))
+ name = child._impl_name
+ if name is None:
+ raise ValueError(_('name must be set to config before creating groupconfig'))
+ #if name in names:
+ # raise ValueError(_('config name must be uniq in groupconfig'))
+ names.append(name)
+ if len(names) != len(set(names)):
+ for idx in xrange(1, len(names) + 1):
+ name = names.pop(0)
+ if name in names:
+ raise ConflictError(_('config name must be uniq in '
+ 'groupconfig for {0}').format(name))
self._impl_children = children
settings, values = get_storages(self, session_id, persistent)
self._impl_settings = Settings(self, settings)
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
+ if name is undefined:
+ name = session_id
+ self._impl_name = name
def cfgimpl_get_children(self):
return self._impl_children
#def cfgimpl_get_context(self):
# "a meta config is a config which has a setting, that is itself"
# return self
- #
+
def cfgimpl_reset_cache(self,
only_expired=False,
only=('values', 'settings')):
for child in self._impl_children:
child.cfgimpl_reset_cache(only_expired=only_expired, only=only)
- def setattrs(self, path, value):
+ def set_value(self, path, value):
"""Setattr not in current GroupConfig, but in each children
"""
for child in self._impl_children:
try:
- if not isinstance(child, GroupConfig):
- setattr(child, path, value)
+ if isinstance(child, MetaConfig):
+ child.set_value(path, value, only_config=True)
+ elif isinstance(child, GroupConfig):
+ child.set_value(path, value)
else:
- child.setattrs(path, value)
+ setattr(child, path, value)
except PropertiesOptionError:
pass
def find_firsts(self, byname=None, bypath=undefined, byoption=undefined,
- byvalue=undefined, type_='option', display_error=True):
+ byvalue=undefined, display_error=True, _sub=False,
+ check_properties=True):
"""Find first not in current GroupConfig, but in each children
"""
ret = []
+
#if MetaConfig, all children have same OptionDescription in context
#so search only one time the option for all children
- try:
- if bypath is undefined and byname is not None and \
- isinstance(self, MetaConfig):
- bypath = self._find(bytype=None, byvalue=undefined, byname=byname,
- first=True, type_='path',
- check_properties=False,
- display_error=display_error)
- byname = None
- byoption = self.cfgimpl_get_description(
- ).impl_get_opt_by_path(bypath)
- except AttributeError:
- return self._find_return_results([], True)
+ if bypath is undefined and byname is not None and \
+ isinstance(self, MetaConfig):
+ bypath = self._find(bytype=None, byvalue=undefined, byname=byname,
+ first=True, type_='path',
+ check_properties=None,
+ display_error=display_error)
+ byname = None
+ byoption = self.cfgimpl_get_description(
+ ).impl_get_opt_by_path(bypath)
+
for child in self._impl_children:
try:
if isinstance(child, GroupConfig):
- ret.extend(child.find_firsts(byname=byname,
- bypath=bypath,
+ ret.extend(child.find_firsts(byname=byname, bypath=bypath,
byoption=byoption,
byvalue=byvalue,
- type_=type_,
- display_error=False))
+ check_properties=check_properties,
+ display_error=False,
+ _sub=True))
else:
- if type_ == 'config':
- f_type = 'path'
- else:
- f_type = type_
- f_ret = child._find(None, byname, byvalue, first=True,
- type_=f_type, display_error=False,
- only_path=bypath,
- only_option=byoption)
- if type_ == 'config':
- ret.append(child)
- else:
- ret.append(f_ret)
+ child._find(None, byname, byvalue, first=True,
+ type_='path', display_error=False,
+ check_properties=check_properties,
+ only_path=bypath, only_option=byoption)
+ ret.append(child)
except AttributeError:
pass
- return self._find_return_results(ret, display_error)
+ if _sub:
+ return ret
+ else:
+ return GroupConfig(self._find_return_results(ret, display_error))
def __repr__(self):
return object.__repr__(self)
def __str__(self):
- return object.__str__(self)
+ ret = ''
+ for child in self._impl_children:
+ ret += '({0})\n'.format(child._impl_name)
+ try:
+ ret += super(GroupConfig, self).__str__()
+ except ConfigError:
+ pass
+ return ret
+
+ def getattr(self, name, force_permissive=False, validate=True):
+ for child in self._impl_children:
+ if name == child._impl_name:
+ return child
+ return super(GroupConfig, self).getattr(name, force_permissive,
+ validate)
class MetaConfig(GroupConfig):
__slots__ = tuple()
- def __init__(self, children, session_id=None, persistent=False):
+ def __init__(self, children, session_id=None, persistent=False,
+ name=undefined):
descr = None
for child in children:
if not isinstance(child, _CommonConfig):
'have the same optiondescription'))
child._impl_meta = weakref.ref(self)
- super(MetaConfig, self).__init__(children, session_id, persistent, descr)
+ super(MetaConfig, self).__init__(children, session_id, persistent,
+ descr, name)
+
+ def set_value(self, path, value, force_default=False,
+ force_dont_change_value=False, force_default_if_same=False,
+ only_config=False):
+ if only_config:
+ if force_default or force_default_if_same or force_dont_change_value:
+ raise ValueError(_('force_default, force_default_if_same or '
+ 'force_dont_change_value cannot be set with'
+ ' only_config'))
+ return super(MetaConfig, self).set_value(path, value)
+ if force_default or force_default_if_same or force_dont_change_value:
+ if force_default and force_dont_change_value:
+ raise ValueError(_('force_default and force_dont_change_value'
+ ' cannot be set together'))
+ opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
+ for child in self._impl_children:
+ if force_default_if_same or force_default:
+ if force_default_if_same:
+ if not child.cfgimpl_get_values()._contains(path):
+ child_value = undefined
+ else:
+ child_value = child.getattr(path)
+ if force_default or value == child_value:
+ child.cfgimpl_get_values().reset(opt, path=path,
+ validate=False)
+ continue
+ if force_dont_change_value:
+ child_value = child.getattr(path)
+ if value != child_value:
+ setattr(child, path, child_value)
+
+ setattr(self, path, value)
else: # pragma: no dynoptiondescription cover
return opt == self.master or opt in self.slaves
- def reset(self, opt, values):
- #FIXME pas de opt ???
+ def reset(self, opt, values, validate):
for slave in self.getslaves(opt):
- values.reset(slave)
+ values.reset(slave, validate=validate)
def pop(self, opt, values, index):
- #FIXME pas test de meta ...
for slave in self.getslaves(opt):
if not values.is_default_owner(slave, validate_properties=False,
validate_meta=False):
def _getslave(self, values, opt, path, validate, force_permissive,
force_properties, validate_properties):
+ """
+ if master has length 0:
+ return []
+ if master has length bigger than 0:
+ if default owner:
+ if has callback:
+ if return a list:
+ list same length as master: return list
+ list is smaller than master: return list + None
+ list is greater than master: raise SlaveError
+ if has default value:
+ list same length as master: return list
+ list is smaller than master: return list + None
+ list is greater than master: raise SlaveError
+ if has default_multi value:
+ return default_multi * master's length
+ if has value:
+ list same length as master: return list
+ list is smaller than master: return list + None
+ list is greater than master: raise SlaveError
+ """
+ master = self.getmaster(opt)
+ masterp = master.impl_getpath(values._getcontext())
+ masterlen = self.get_length(values, opt, validate, undefined,
+ undefined, force_permissive,
+ master=master)
+ master_is_meta = values._is_meta(opt, masterp)
value = values._get_validated_value(opt, path, validate,
force_permissive,
force_properties,
validate_properties,
- None) # not undefined
- return self.get_slave_value(values, opt, value, validate,
- validate_properties, force_permissive)
+ None, # not undefined
+ with_meta=master_is_meta)
+ #if slave, had values until master's one
+ path = opt.impl_getpath(values._getcontext())
+ valuelen = len(value)
+ if validate:
+ self.validate_slave_length(masterlen, valuelen,
+ opt.impl_getname(), opt)
+ if valuelen < masterlen:
+ for num in range(0, masterlen - valuelen):
+ index = valuelen + num
+ value.append(values._get_validated_value(opt, path, True,
+ False, None,
+ validate_properties,
+ with_meta=master_is_meta,
+ index=index),
+ setitem=False,
+ force=True)
+ return value
def setitem(self, values, opt, value, path):
if self.is_master(opt):
opt.impl_getname(), opt, setitem=True)
def get_length(self, values, opt, validate=True, slave_path=undefined,
- slave_value=undefined, force_permissive=False):
+ slave_value=undefined, force_permissive=False, master=None,
+ masterp=None):
"""get master len with slave option"""
- masterp = self.getmaster(opt).impl_getpath(values._getcontext())
+ if master is None:
+ master = self.getmaster(opt)
+ if masterp is None:
+ masterp = master.impl_getpath(values._getcontext())
if slave_value is undefined:
slave_path = undefined
- return len(self.getitem(values, self.getmaster(opt), masterp, validate,
+ return len(self.getitem(values, master, masterp, validate,
force_permissive, None, True, slave_path,
slave_value))
raise SlaveError(_("invalid len for the slave: {0}"
" which has {1} as master").format(
name, self.getmaster(opt).impl_getname()))
-
- def get_slave_value(self, values, opt, value, validate=True,
- validate_properties=True, force_permissive=False):
- """
- if master has length 0:
- return []
- if master has length bigger than 0:
- if default owner:
- if has callback:
- if return a list:
- list same length as master: return list
- list is smaller than master: return list + None
- list is greater than master: raise SlaveError
- if has default value:
- list same length as master: return list
- list is smaller than master: return list + None
- list is greater than master: raise SlaveError
- if has default_multi value:
- return default_multi * master's length
- if has value:
- list same length as master: return list
- list is smaller than master: return list + None
- list is greater than master: raise SlaveError
- """
- #if slave, had values until master's one
- path = opt.impl_getpath(values._getcontext())
- masterlen = self.get_length(values, opt, validate, path, value,
- force_permissive)
- valuelen = len(value)
- if validate:
- self.validate_slave_length(masterlen, valuelen, opt.impl_getname(), opt)
- if valuelen < masterlen:
- for num in range(0, masterlen - valuelen):
- index = valuelen + num
- value.append(values._get_validated_value(opt, path, True,
- False, None,
- validate_properties,
- index=index),
- setitem=False,
- force=True)
- return value
setattr(owners, name, owners.Owner(name))
setattr(owners, 'addowner', addowner)
-
# ____________________________________________________________
# populate groups and owners with default attributes
groups = GroupModule()
path = opt.impl_getpath(self._getcontext())
return self._getitem(opt, path)
- def _getitem(self, opt, path):
- return Property(self, self._getproperties(opt, path), opt, path)
+ def _getitem(self, opt, path, self_properties=undefined):
+ return Property(self,
+ self._getproperties(opt, path,
+ self_properties=self_properties),
+ opt, path)
def __setitem__(self, opt, value): # pragma: optional cover
raise ValueError(_('you should only append/remove properties'))
self._p_.delproperties(_path)
self._getcontext().cfgimpl_reset_cache()
- def _getproperties(self, opt=None, path=None, _is_apply_req=True):
+ def _getproperties(self, opt=None, path=None, _is_apply_req=True,
+ self_properties=undefined):
"""
be careful, _is_apply_req doesn't copy properties
"""
if opt is None:
props = copy(self._p_.getproperties(path, default_properties))
else:
+ if self_properties is undefined:
+ self_properties = self._getproperties()
if path is None: # pragma: optional cover
raise ValueError(_('if opt is not None, path should not be'
' None in _getproperties'))
ntime = None
- if 'cache' in self and self._p_.hascache(path):
- if 'expire' in self:
+ if 'cache' in self_properties and self._p_.hascache(path):
+ if 'expire' in self_properties:
ntime = int(time())
is_cached, props = self._p_.getcache(path, ntime)
if is_cached:
if _is_apply_req:
props = copy(props)
props |= self.apply_requires(opt, path)
- if 'cache' in self:
- if 'expire' in self:
+ if 'cache' in self_properties:
+ if 'expire' in self_properties:
if ntime is None:
ntime = int(time())
ntime = ntime + expires_time
#____________________________________________________________
def validate_properties(self, opt_or_descr, is_descr, is_write, path,
value=None, force_permissive=False,
- force_properties=None, force_permissives=None):
+ force_properties=None, force_permissives=None,
+ self_properties=undefined):
"""
validation upon the properties related to `opt_or_descr`
(typically with the `frozen` property)
"""
# opt properties
- properties = self._getproperties(opt_or_descr, path)
- self_properties = self._getproperties()
+ if self_properties is undefined:
+ self_properties = self._getproperties()
+ properties = self._getproperties(opt_or_descr, path,
+ self_properties=self_properties)
# remove opt permissive
# permissive affect option's permission with or without permissive
# global property
raise ConfigError(_('the context does not exist anymore'))
return context
- def _getvalue(self, opt, path, is_default, index=undefined):
+ def _getvalue(self, opt, path, is_default, index=undefined,
+ with_meta=True, setting_properties=undefined):
"""actually retrieves the value
:param opt: the `option.Option()` object
if opt.impl_is_optiondescription(): # pragma: optional cover
raise ValueError(_('optiondescription has no value'))
setting = self._getcontext().cfgimpl_get_settings()
- force_default = 'frozen' in setting._getitem(opt, path) and \
- 'force_default_on_freeze' in setting._getitem(opt, path)
+ force_default = 'frozen' in setting._getitem(opt, path,
+ self_properties=setting_properties) and \
+ 'force_default_on_freeze' in setting._getitem(opt, path,
+ self_properties=setting_properties)
if not is_default and not force_default:
value = self._p_.getvalue(path)
if index is not undefined:
return value
except IndexError:
pass
- meta = self._getcontext().cfgimpl_get_meta()
- if meta is not None:
- #FIXME : problème de longueur si meta + slave
- #doit passer de meta à pas meta
- #en plus il faut gérer la longueur avec les meta !
- #FIXME SymLinkOption
- value = meta.cfgimpl_get_values()._get_cached_item(opt, path)
- if isinstance(value, Multi):
- if index is not undefined:
- value = value[index]
- else:
- value = list(value)
- return value
+ if with_meta:
+ meta = self._getcontext().cfgimpl_get_meta()
+ if meta is not None:
+ #FIXME : possible problème de longueur si slave en SymLinkOption
+ try:
+ value = meta.cfgimpl_get_values(
+ )._get_cached_item(opt, path)
+ if isinstance(value, Multi):
+ if index is not undefined:
+ value = value[index]
+ else:
+ value = list(value)
+ return value
+ except PropertiesOptionError:
+ pass
# now try to get default value
value = opt.impl_getdefault()
if opt.impl_is_multi() and index is not undefined:
"""overrides the builtins `del()` instructions"""
self.reset(opt)
- def reset(self, opt, path=None):
+ def reset(self, opt, path=None, validate=True):
if path is None:
path = opt.impl_getpath(self._getcontext())
context = self._getcontext()
- context.cfgimpl_get_settings().validate_properties(opt, False, True,
- path)
- if self._p_.hasvalue(path):
- setting = context.cfgimpl_get_settings()
- opt.impl_validate(opt.impl_getdefault(),
- context, 'validator' in setting)
+ if validate:
+ context.cfgimpl_get_settings().validate_properties(opt, False,
+ True, path)
+ if self._contains(path):
+ if validate:
+ setting = context.cfgimpl_get_settings()
+ opt.impl_validate(opt.impl_getdefault(),
+ context, 'validator' in setting)
context.cfgimpl_reset_cache()
if opt.impl_is_master_slaves('master'):
- opt.impl_get_master_slaves().reset(opt, self)
+ opt.impl_get_master_slaves().reset(opt, self, validate)
self._p_.resetvalue(path)
def _isempty(self, opt, value):
def _get_cached_item(self, opt, path=None, validate=True,
force_permissive=False, force_properties=None,
- validate_properties=True):
+ validate_properties=True,
+ setting_properties=undefined):
if path is None:
path = opt.impl_getpath(self._getcontext())
ntime = None
- setting = self._getcontext().cfgimpl_get_settings()
- if 'cache' in setting and self._p_.hascache(path):
- if 'expire' in setting:
+ if setting_properties is undefined:
+ setting_properties = self._getcontext().cfgimpl_get_settings(
+ )._getproperties()
+ if 'cache' in setting_properties and self._p_.hascache(path):
+ if 'expire' in setting_properties:
ntime = int(time())
is_cached, value = self._p_.getcache(path, ntime)
if is_cached:
value = Multi(value, self.context, opt, path)
return value
val = self._getitem(opt, path, validate, force_permissive,
- force_properties, validate_properties)
- if 'cache' in setting and validate and validate_properties and \
- force_permissive is False and force_properties is None:
- if 'expire' in setting:
+ force_properties, validate_properties,
+ setting_properties)
+ if 'cache' in setting_properties and validate and validate_properties \
+ and force_permissive is False and force_properties is None:
+ if 'expire' in setting_properties:
if ntime is None:
ntime = int(time())
ntime = ntime + expires_time
return val
def _getitem(self, opt, path, validate, force_permissive, force_properties,
- validate_properties):
+ validate_properties, setting_properties=undefined):
if opt.impl_is_master_slaves():
return opt.impl_get_master_slaves().getitem(self, opt, path,
validate,
return self._get_validated_value(opt, path, validate,
force_permissive,
force_properties,
- validate_properties)
+ validate_properties,
+ setting_properties=setting_properties)
def _get_validated_value(self, opt, path, validate, force_permissive,
force_properties, validate_properties,
- index=undefined, submulti_index=undefined):
+ index=undefined, submulti_index=undefined,
+ with_meta=True, setting_properties=undefined):
"""same has getitem but don't touch the cache
index is None for slave value, if value returned is not a list, just return []
"""
setting = context.cfgimpl_get_settings()
is_default = self._is_default_owner(opt, path,
validate_properties=False,
- validate_meta=False)
+ validate_meta=False,
+ setting_properties=setting_properties)
try:
if index is None:
gv_index = undefined
else:
gv_index = index
- value = self._getvalue(opt, path, is_default, index=gv_index)
+ value = self._getvalue(opt, path, is_default, index=gv_index,
+ with_meta=with_meta,
+ setting_properties=setting_properties)
config_error = None
except ConfigError as err:
# For calculating properties, we need value (ie for mandatory
force_submulti_index = None
else:
force_submulti_index = submulti_index
- opt.impl_validate(value, context, 'validator' in setting,
+ if setting_properties is undefined:
+ setting_properties = setting._getproperties()
+ opt.impl_validate(value, context,
+ 'validator' in setting_properties,
force_index=force_index,
force_submulti_index=force_submulti_index)
#FIXME pas de test avec les metas ...
#FIXME et les symlinkoption ...
if is_default and 'force_store_value' in setting._getitem(opt,
- path):
+ path,
+ self_properties=setting_properties):
if isinstance(value, Multi):
item = list(value)
else:
setting.validate_properties(opt, False, False, value=value,
path=path,
force_permissive=force_permissive,
- force_properties=force_properties)
+ force_properties=force_properties,
+ self_properties=setting_properties)
if config_error is not None:
raise config_error
return value
# user didn't change value, so not write
# valid opt
context = self._getcontext()
+ setting_properties = context.cfgimpl_get_settings()._getproperties()
opt.impl_validate(value, context,
- 'validator' in context.cfgimpl_get_settings())
+ 'validator' in setting_properties)
if opt.impl_is_multi():
- #value = Multi(value, self.context, opt, path)
if opt.impl_is_master_slaves():
opt.impl_get_master_slaves().setitem(self, opt, value, path)
self._setvalue(opt, path, value, force_permissive=force_permissive,
- is_write=is_write)
+ is_write=is_write,
+ setting_properties=setting_properties)
def _setvalue(self, opt, path, value, force_permissive=False,
- is_write=True, validate_properties=True):
+ is_write=True, validate_properties=True,
+ setting_properties=undefined):
context = self._getcontext()
context.cfgimpl_reset_cache()
if validate_properties:
setting = context.cfgimpl_get_settings()
setting.validate_properties(opt, False, is_write,
value=value, path=path,
- force_permissive=force_permissive)
+ force_permissive=force_permissive,
+ self_properties=setting_properties)
owner = context.cfgimpl_get_settings().getowner()
if isinstance(value, Multi):
value = list(value)
value[idx] = list(val)
self._p_.setvalue(path, value, owner)
+ def _is_meta(self, opt, path):
+ context = self._getcontext()
+ setting = context.cfgimpl_get_settings()
+ settings = setting._getitem(opt, path)
+ if 'frozen' in settings and 'force_default_on_freeze' in settings:
+ return False
+ if self._p_.getowner(path, owners.default) is not owners.default:
+ return False
+ if context.cfgimpl_get_meta() is not None:
+ return True
+ return False
+
def getowner(self, opt, force_permissive=False):
"""
retrieves the option's owner
return self._getowner(opt, path, force_permissive=force_permissive)
def _getowner(self, opt, path, validate_properties=True,
- force_permissive=False, validate_meta=True):
- if not isinstance(opt, Option) and not isinstance(opt, DynSymLinkOption):
+ force_permissive=False, validate_meta=undefined,
+ setting_properties=undefined):
+ if not isinstance(opt, Option) and not isinstance(opt,
+ DynSymLinkOption):
raise ConfigError(_('owner only avalaible for an option'))
context = self._getcontext()
setting = context.cfgimpl_get_settings()
- if 'frozen' in setting._getitem(opt, path) and \
- 'force_default_on_freeze' in setting._getitem(opt, path):
+ settings = setting._getitem(opt, path,
+ self_properties=setting_properties)
+ if 'frozen' in settings and 'force_default_on_freeze' in settings:
return owners.default
if validate_properties:
self._getitem(opt, path, True, force_permissive, None, True)
owner = self._p_.getowner(path, owners.default)
+ if validate_meta is undefined:
+ if opt.impl_is_master_slaves('slave'):
+ master = opt.impl_get_master_slaves().getmaster(opt)
+ masterp = master.impl_getpath(context)
+ validate_meta = self._is_meta(opt, masterp)
+ else:
+ validate_meta = True
if validate_meta:
meta = context.cfgimpl_get_meta()
if owner is owners.default and meta is not None:
validate_meta=validate_meta)
def _is_default_owner(self, opt, path, validate_properties=True,
- validate_meta=True):
+ validate_meta=True, setting_properties=undefined):
return self._getowner(opt, path, validate_properties,
- validate_meta=validate_meta) == owners.default
+ validate_meta=validate_meta,
+ setting_properties=setting_properties) == \
+ owners.default
def reset_cache(self, only_expired):
"""