1 # -*- coding: utf-8 -*-
2 "pretty small and local configuration management tool"
3 # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # The original `Config` design model is unproudly borrowed from
20 # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
21 # the whole pypy projet is under MIT licence
22 # ____________________________________________________________
24 from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError,
25 AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
26 MandatoryError, MethodCallError, NoValueReturned)
27 from tiramisu.option import (OptionDescription, Option, SymLinkOption,
28 Multi, apply_requires)
29 from tiramisu.setting import groups, owners, Setting
30 from tiramisu.value import OptionValues
32 # ____________________________________________________________
34 "main configuration management entry"
35 _cfgimpl_toplevel = None
37 def __init__(self, descr, parent=None, context=None):
38 """ Configuration option management master class
39 :param descr: describes the configuration schema
40 :type descr: an instance of ``option.OptionDescription``
41 :param parent: is None if the ``Config`` is root parent Config otherwise
42 :type parent: ``Config``
43 :param context: the current root config
44 :type context: `Config`
46 self._cfgimpl_descr = descr
47 self._cfgimpl_parent = parent
49 self._cfgimpl_settings = Setting()
50 self._cfgimpl_values = OptionValues()
53 raise ConfigError("cannot find a value for this config")
54 self._cfgimpl_settings = None
55 self._cfgimpl_values = None
57 self._cfgimpl_context = self
59 self._cfgimpl_context = context
60 "warnings are a great idea, let's make up a better use of it"
61 self._cfgimpl_warnings = []
62 self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
65 def cfgimpl_get_settings(self):
66 return self._cfgimpl_context._cfgimpl_settings
68 def cfgimpl_set_settings(self, settings):
69 if not isinstance(settings, Setting):
70 raise ConfigError("setting not allowed")
71 self._cfgimpl_context._cfgimpl_settings = settings
73 def _validate_duplicates(self, children):
74 """duplicates Option names in the schema
75 :type children: list of `Option` or `OptionDescription`
79 if dup._name not in duplicates:
80 duplicates.append(dup._name)
82 raise ConflictConfigError('duplicate option name: '
83 '{0}'.format(dup._name))
85 def _cfgimpl_build(self):
87 - builds the config object from the schema
88 - settles various default values for options
90 self._validate_duplicates(self._cfgimpl_descr._children)
91 #max len for a master/slave group
93 for child in self._cfgimpl_descr._children:
94 if isinstance(child, Option):
96 childdef = Multi(copy(child.getdefault()), config=self,
98 max_len_child = max(max_len_child, len(childdef))
99 self._cfgimpl_context._cfgimpl_values[child] = childdef
100 self._cfgimpl_context._cfgimpl_values.previous_values[child] = list(childdef)
102 childdef = child.getdefault()
103 self._cfgimpl_context._cfgimpl_values[child] = childdef
104 self._cfgimpl_context._cfgimpl_values.previous_values[child] = childdef
105 child.setowner(self, owners.default)
106 elif isinstance(child, OptionDescription):
107 self._validate_duplicates(child._children)
108 self._cfgimpl_context._cfgimpl_values[child] = Config(child, parent=self,
109 context=self._cfgimpl_context)
111 # def cfgimpl_update(self):
112 # """dynamically adds `Option()` or `OptionDescription()`
114 # # FIXME this is an update for new options in the schema only
115 # # see the update_child() method of the descr object
116 # for child in self._cfgimpl_descr._children:
117 # if isinstance(child, Option):
118 # if child._name not in self._cfgimpl_values:
119 # if child.is_multi():
120 # self._cfgimpl_values[child._name] = Multi(
121 # copy(child.getdefault()), config=self, opt=child)
123 # self._cfgimpl_values[child._name] = copy(child.getdefault())
124 # child.setowner(self, owners.default)
125 # elif isinstance(child, OptionDescription):
126 # if child._name not in self._cfgimpl_values:
127 # self._cfgimpl_values[child._name] = Config(child, parent=self)
129 # ____________________________________________________________
131 def __setattr__(self, name, value):
132 "attribute notation mechanism for the setting of the value of an option"
133 if name.startswith('_cfgimpl_'):
134 self.__dict__[name] = value
137 homeconfig, name = self._cfgimpl_get_home_by_path(name)
138 return setattr(homeconfig, name, value)
139 if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
140 self._validate(name, getattr(self._cfgimpl_descr, name))
141 self.setoption(name, value,
142 self._cfgimpl_context._cfgimpl_settings.get_owner())
144 def _validate(self, name, opt_or_descr, permissive=False):
145 "validation for the setattr and the getattr"
146 apply_requires(opt_or_descr, self, permissive=permissive)
147 if not isinstance(opt_or_descr, Option) and \
148 not isinstance(opt_or_descr, OptionDescription):
149 raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
150 properties = copy(opt_or_descr.properties)
151 for proper in copy(properties):
152 if not self._cfgimpl_context._cfgimpl_settings.has_property(proper):
153 properties.remove(proper)
155 for perm in self._cfgimpl_context._cfgimpl_settings.permissive:
156 if perm in properties:
157 properties.remove(perm)
159 raise PropertiesOptionError("trying to access"
160 " to an option named: {0} with properties"
161 " {1}".format(name, str(properties)),
164 def _is_empty(self, opt):
165 "convenience method to know if an option is empty"
166 if (not opt.is_multi() and self._cfgimpl_context._cfgimpl_values[opt] == None) or \
167 (opt.is_multi() and (self._cfgimpl_context._cfgimpl_values[opt] == [] or \
168 None in self._cfgimpl_context._cfgimpl_values[opt])):
172 def _test_mandatory(self, path, opt):
174 mandatory = self._cfgimpl_context._cfgimpl_settings.mandatory
175 if opt.is_mandatory() and mandatory:
176 if self._is_empty(opt) and \
177 opt.is_empty_by_default():
178 raise MandatoryError("option: {0} is mandatory "
179 "and shall have a value".format(path))
181 def __getattr__(self, name):
182 return self._getattr(name)
184 def fill_multi(self, opt, result, use_default_multi=False, default_multi=None):
185 """fills a multi option with default and calculated values
187 # FIXME C'EST ENCORE DU N'IMPORTE QUOI
188 value = self._cfgimpl_context._cfgimpl_values[opt]
189 if not isinstance(result, list):
193 return Multi(_result, value.config, opt=value.opt)
195 def _getattr(self, name, permissive=False):
197 attribute notation mechanism for accessing the value of an option
198 :param name: attribute name
199 :param permissive: permissive doesn't raise some property error
201 :return: option's value if name is an option name, OptionDescription
204 # attribute access by passing a path,
205 # for instance getattr(self, "creole.general.family.adresse_ip_eth0")
207 homeconfig, name = self._cfgimpl_get_home_by_path(name)
208 return homeconfig._getattr(name, permissive)
209 opt_or_descr = getattr(self._cfgimpl_descr, name)
211 if type(opt_or_descr) == SymLinkOption:
212 rootconfig = self._cfgimpl_get_toplevel()
213 return getattr(rootconfig, opt_or_descr.path)
214 if opt_or_descr not in self._cfgimpl_context._cfgimpl_values:
215 raise AttributeError("%s object has no attribute %s" %
216 (self.__class__, name))
217 self._validate(name, opt_or_descr, permissive)
219 if name.startswith('_cfgimpl_'):
220 # if it were in __dict__ it would have been found already
221 return self.__dict__[name]
222 raise AttributeError("%s object has no attribute %s" %
223 (self.__class__, name))
224 if not isinstance(opt_or_descr, OptionDescription):
225 # options with callbacks
226 if opt_or_descr.has_callback():
227 value = self._cfgimpl_context._cfgimpl_values[opt_or_descr]
228 if (not opt_or_descr.is_frozen() or \
229 not opt_or_descr.is_forced_on_freeze()) and \
230 not opt_or_descr.is_default_owner(self):
233 result = opt_or_descr.getcallback_value(
234 self._cfgimpl_get_toplevel())
235 except NoValueReturned, err:
238 if opt_or_descr.is_multi():
239 _result = self.fill_multi(opt_or_descr, result)
241 # this result **shall not** be a list
242 if isinstance(result, list):
243 raise ConfigError('invalid calculated value returned'
244 ' for option {0} : shall not be a list'.format(name))
246 if _result != None and not opt_or_descr.validate(_result,
247 self._cfgimpl_context._cfgimpl_settings.validator):
248 raise ConfigError('invalid calculated value returned'
249 ' for option {0}'.format(name))
250 self._cfgimpl_context._cfgimpl_values[opt_or_descr] = _result
251 opt_or_descr.setowner(self, owners.default)
252 # frozen and force default
253 if not opt_or_descr.has_callback() and opt_or_descr.is_forced_on_freeze():
254 value = opt_or_descr.getdefault()
255 if opt_or_descr.is_multi():
256 value = self.fill_multi(opt_or_descr, value,
257 use_default_multi=True,
258 default_multi=opt_or_descr.getdefault_multi())
259 self._cfgimpl_context._cfgimpl_values[opt_or_descr] = value
260 opt_or_descr.setowner(self, owners.default)
261 self._test_mandatory(name, opt_or_descr)
262 value = self._cfgimpl_context._cfgimpl_values[opt_or_descr]
265 def unwrap_from_name(self, name):
266 """convenience method to extract and Option() object from the Config()
267 **and it is slow**: it recursively searches into the namespaces
271 paths = self.getpaths(allpaths=True)
272 opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
273 all_paths = [p.split(".") for p in self.getpaths()]
274 for pth in all_paths:
276 return opts[".".join(pth)]
277 raise NotFoundError("name: {0} not found".format(name))
279 def unwrap_from_path(self, path):
280 """convenience method to extract and Option() object from the Config()
281 and it is **fast**: finds the option directly in the appropriate
287 homeconfig, path = self._cfgimpl_get_home_by_path(path)
288 return getattr(homeconfig._cfgimpl_descr, path)
289 return getattr(self._cfgimpl_descr, path)
291 def setoption(self, name, value, who=None):
292 """effectively modifies the value of an Option()
293 (typically called by the __setattr__)
294 :param who : an object that lives in `setting.owners`
296 child = getattr(self._cfgimpl_descr, name)
297 if type(child) != SymLinkOption:
299 who = self._cfgimpl_context._cfgimpl_settings.owner
301 if not isinstance(who, owners.DefaultOwner):
302 if type(value) != Multi:
303 if type(value) == list:
304 value = Multi(value, self, child)
306 raise ConfigError("invalid value for option:"
307 " {0} that is set to multi".format(name))
309 value = self.fill_multi(child, child.getdefault(),
310 use_default_multi=True,
311 default_multi=child.getdefault_multi())
312 if not isinstance(who, owners.Owner):
313 raise TypeError("invalid owner [{0}] for option: {1}".format(
315 child.setoption(self, value)
316 child.setowner(self, who)
318 homeconfig = self._cfgimpl_get_toplevel()
319 child.setoption(homeconfig, value)
321 def set(self, **kwargs):
323 do what I mean"-interface to option setting. Searches all paths
324 starting from that config for matches of the optional arguments
325 and sets the found option if the match is not ambiguous.
326 :param kwargs: dict of name strings to values.
328 all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
329 for key, value in kwargs.iteritems():
330 key_p = key.split('.')
331 candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
332 if len(candidates) == 1:
333 name = '.'.join(candidates[0])
334 homeconfig, name = self._cfgimpl_get_home_by_path(name)
336 getattr(homeconfig, name)
337 except MandatoryError:
340 raise e # HiddenOptionError or DisabledOptionError
341 homeconfig.setoption(name, value,
342 self._cfgimpl_context._cfgimpl_settings.get_owner())
343 elif len(candidates) > 1:
344 raise AmbigousOptionError(
345 'more than one option that ends with %s' % (key, ))
347 raise NoMatchingOptionFound(
348 'there is no option that matches %s'
349 ' or the option is hidden or disabled'% (key, ))
353 same as a `find_first()` method in a config that has identical names:
354 it returns the first item of an option named `name`
356 much like the attribute access way, except that
357 the search for the option is performed recursively in the whole
359 **carefull**: very slow !
361 :returns: option value.
363 paths = self.getpaths(allpaths=True)
366 pathname = path.split('.')[-1]
369 value = getattr(self, path)
373 raise NotFoundError("option {0} not found in config".format(name))
375 def _cfgimpl_get_home_by_path(self, path):
376 """:returns: tuple (config, name)"""
377 path = path.split('.')
378 for step in path[:-1]:
379 self = getattr(self, step)
380 return self, path[-1]
382 def _cfgimpl_get_toplevel(self):
383 ":returns: root config"
384 while self._cfgimpl_parent is not None:
385 self = self._cfgimpl_parent
388 def _cfgimpl_get_path(self):
389 "the path in the attribute access meaning."
392 while obj._cfgimpl_parent is not None:
393 subpath.insert(0, obj._cfgimpl_descr._name)
394 obj = obj._cfgimpl_parent
395 return ".".join(subpath)
396 # ______________________________________________________________________
397 # def cfgimpl_previous_value(self, path):
398 # "stores the previous value"
399 # home, name = self._cfgimpl_get_home_by_path(path)
400 # # FIXME fucking name
401 # return home._cfgimpl_context._cfgimpl_values.previous_values[name]
403 # def get_previous_value(self, name):
404 # "for the time being, only the previous Option's value is accessible"
405 # return self._cfgimpl_context._cfgimpl_values.previous_values[name]
406 # ______________________________________________________________________
407 def add_warning(self, warning):
408 "Config implements its own warning pile. Could be useful"
409 self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
411 def get_warnings(self):
412 "Config implements its own warning pile"
413 return self._cfgimpl_get_toplevel()._cfgimpl_warnings
414 # ____________________________________________________________
416 return self._cfgimpl_descr.getkey(self)
419 return hash(self.getkey())
421 def __eq__(self, other):
423 if not isinstance(other, Config):
425 return self.getkey() == other.getkey()
427 def __ne__(self, other):
429 return not self == other
430 # ______________________________________________________________________
432 """Pythonesque way of parsing group's ordered options.
433 iteration only on Options (not OptionDescriptions)"""
434 for child in self._cfgimpl_descr._children:
435 if not isinstance(child, OptionDescription):
437 yield child._name, getattr(self, child._name)
439 pass # option with properties
441 def iter_groups(self, group_type=None):
442 """iteration on groups objects only.
443 All groups are returned if `group_type` is `None`, otherwise the groups
444 can be filtered by categories (families, or whatever).
445 :param group_type: if defined, is an instance of `groups.GroupType`
446 or `groups.MasterGroupType` that lives in
450 if group_type is not None:
451 if not isinstance(group_type, groups.GroupType):
452 raise TypeError("Unknown group_type: {0}".format(group_type))
453 for child in self._cfgimpl_descr._children:
454 if isinstance(child, OptionDescription):
456 if group_type is not None:
457 if child.get_group_type() == group_type:
458 yield child._name, getattr(self, child._name)
460 yield child._name, getattr(self, child._name)
463 # ______________________________________________________________________
465 "Config's string representation"
467 for name, grp in self.iter_groups():
468 lines.append("[%s]" % name)
469 for name, value in self:
471 lines.append("%s = %s" % (name, value))
474 return '\n'.join(lines)
479 def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
480 """returns a list of all paths in self, recursively, taking care of
481 the context of properties (hidden/disabled)
483 :param include_groups: if true, OptionDescription are included
484 :param allpaths: all the options (event the properties protected ones)
485 :param mandatory: includes the mandatory options
486 :returns: list of all paths
489 for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
491 value = getattr(self, path)
493 except MandatoryError:
494 if mandatory or allpaths:
496 except PropertiesOptionError:
498 paths.append(path) # option which have properties added
503 def _find(self, bytype, byname, byvalue, byattrs, first):
505 convenience method for finding an option that lives only in the subtree
507 :param first: return only one option if True, a list otherwise
508 :return: find list or an exception if nothing has been found
510 def _filter_by_attrs():
513 for key, value in byattrs.items():
514 if not hasattr(option, key):
517 if getattr(option, key) != value:
522 def _filter_by_name():
525 pathname = path.split('.')[-1]
526 if pathname == byname:
530 def _filter_by_value():
534 value = getattr(self, path)
537 except: # a property restricts the access of the value
540 def _filter_by_type():
543 if isinstance(option, bytype):
548 paths = self.getpaths(allpaths=True)
551 option = self.unwrap_from_path(path)
552 except PropertiesOptionError, err:
554 if not _filter_by_name():
556 if not _filter_by_value():
558 if not _filter_by_type():
560 if not _filter_by_attrs():
565 find_results.append(option)
567 if find_results == []:
568 raise NotFoundError("no option found in config with these criteria")
572 def find(self, bytype=None, byname=None, byvalue=None, byattrs=None):
574 finds a list of options recursively in the config
576 :param bytype: Option class (BoolOption, StrOption, ...)
577 :param byname: filter by Option._name
578 :param byvalue: filter by the option's value
579 :param byattrs: dict of option attributes (default, callback...)
580 :returns: list of matching Option objects
582 return self._find(bytype, byname, byvalue, byattrs, first=False)
584 def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None):
586 finds an option recursively in the config
588 :param bytype: Option class (BoolOption, StrOption, ...)
589 :param byname: filter by Option._name
590 :param byvalue: filter by the option's value
591 :param byattrs: dict of option attributes (default, callback...)
592 :returns: list of matching Option objects
594 return self._find(bytype, byname, byvalue, byattrs, first=True)
596 def make_dict(config, flatten=False):
597 """export the whole config into a `dict`
598 :returns: dict of Option's name (or path) and values"""
599 paths = config.getpaths()
603 pathname = path.split('.')[-1]
607 value = getattr(config, path)
608 pathsvalues.append((pathname, value))
610 pass # this just a hidden or disabled option
611 options = dict(pathsvalues)
614 def mandatory_warnings(config):
615 """convenience function to trace Options that are mandatory and
616 where no value has been set
618 :returns: generator of mandatory Option's path
619 FIXME : CAREFULL : not multi-user
621 mandatory = config._cfgimpl_context._cfgimpl_settings.mandatory
622 config._cfgimpl_context._cfgimpl_settings.mandatory = True
623 for path in config._cfgimpl_descr.getpaths(include_groups=True):
625 value = config._getattr(path, permissive=True)
626 except MandatoryError:
628 except PropertiesOptionError:
630 config._cfgimpl_context._cfgimpl_settings.mandatory = mandatory