1 # -*- coding: utf-8 -*-
2 "pretty small and local configuration management tool"
3 # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # The original `Config` design model is unproudly borrowed from
20 # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
21 # the whole pypy projet is under MIT licence
22 # ____________________________________________________________
24 from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError,
25 AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
26 MandatoryError, MethodCallError)
27 from tiramisu.option import (OptionDescription, Option, SymLinkOption,
28 group_types, Multi, apply_requires)
29 from tiramisu.autolib import carry_out_calculation
31 # ______________________________________________________________________
32 # generic owner. 'default' is the general config owner after init time
33 default_owner = 'user'
34 # ____________________________________________________________
36 _cfgimpl_properties = ['hidden', 'disabled']
37 _cfgimpl_mandatory = True
38 _cfgimpl_frozen = True
39 _cfgimpl_owner = default_owner
40 _cfgimpl_toplevel = None
41 # TODO implement unicity by name
42 # _cfgimpl_unique_names = True
44 def __init__(self, descr, parent=None, **overrides):
45 self._cfgimpl_descr = descr
46 self._cfgimpl_value_owners = {}
47 self._cfgimpl_parent = parent
48 # `Config()` indeed takes care of the `Option()`'s values
49 self._cfgimpl_values = {}
50 self._cfgimpl_previous_values = {}
51 # XXX warnings are a great idea, let's make up a better use of it
52 self._cfgimpl_warnings = []
53 self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
54 # `freeze()` allows us to carry out this calculation again if necessary
55 self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen
56 self._cfgimpl_build(overrides)
58 def _validate_duplicates(self, children):
61 if dup._name not in duplicates:
62 duplicates.append(dup._name)
64 raise ConflictConfigError('duplicate option name: '
65 '{0}'.format(dup._name))
67 # TODO implement unicity by name
68 # def _validate_duplicates_for_names(self, children):
69 # "validates duplicates names agains the whole config"
70 # rootconfig = self._cfgimpl_get_toplevel()
71 # if self._cfgimpl_unique_names:
72 # for dup in children:
76 # print rootconfig.get(dup._name)
77 # except AttributeError:
80 # #rootconfig.get(dup._name)
81 # except NotFoundError:
82 # pass # no identical names, it's fine
84 # raise ConflictConfigError('duplicate option name: '
85 # '{0}'.format(dup._name))
87 def _cfgimpl_build(self, overrides):
88 self._validate_duplicates(self._cfgimpl_descr._children)
89 for child in self._cfgimpl_descr._children:
90 if isinstance(child, Option):
92 childdef = Multi(copy(child.getdefault()), config=self,
94 self._cfgimpl_values[child._name] = childdef
95 self._cfgimpl_previous_values[child._name] = list(childdef)
96 self._cfgimpl_value_owners[child._name] = ['default' \
97 for i in range(len(child.getdefault() ))]
99 childdef = child.getdefault()
100 self._cfgimpl_values[child._name] = childdef
101 self._cfgimpl_previous_values[child._name] = childdef
102 self._cfgimpl_value_owners[child._name] = 'default'
103 elif isinstance(child, OptionDescription):
104 self._validate_duplicates(child._children)
105 self._cfgimpl_values[child._name] = Config(child, parent=self)
106 self.override(overrides)
108 def cfgimpl_update(self):
109 "dynamically adds `Option()` or `OptionDescription()`"
110 # Nothing is static. Everything evolve.
111 # FIXME this is an update for new options in the schema only
112 # see the update_child() method of the descr object
113 for child in self._cfgimpl_descr._children:
114 if isinstance(child, Option):
115 if child._name not in self._cfgimpl_values:
117 self._cfgimpl_values[child._name] = Multi(
118 copy(child.getdefault()), config=self, child=child)
119 self._cfgimpl_value_owners[child._name] = ['default' \
120 for i in range(len(child.getdefault() ))]
122 self._cfgimpl_values[child._name] = copy(child.getdefault())
123 self._cfgimpl_value_owners[child._name] = 'default'
124 elif isinstance(child, OptionDescription):
125 if child._name not in self._cfgimpl_values:
126 self._cfgimpl_values[child._name] = Config(child, parent=self)
128 def override(self, overrides):
129 for name, value in overrides.iteritems():
130 homeconfig, name = self._cfgimpl_get_home_by_path(name)
131 homeconfig.setoption(name, value, 'default')
133 def cfgimpl_set_owner(self, owner):
134 self._cfgimpl_owner = owner
135 for child in self._cfgimpl_descr._children:
136 if isinstance(child, OptionDescription):
137 self._cfgimpl_values[child._name].cfgimpl_set_owner(owner)
138 # ____________________________________________________________
139 def _cfgimpl_has_properties(self):
140 return bool(len(self._cfgimpl_properties))
142 def _cfgimpl_has_property(self, propname):
143 return propname in self._cfgimpl_properties
145 def cfgimpl_enable_property(self, propname):
146 if self._cfgimpl_parent != None:
147 raise MethodCallError("this method root_hide() shall not be"
148 "used with non-root Config() object")
149 if propname not in self._cfgimpl_properties:
150 self._cfgimpl_properties.append(propname)
152 def cfgimpl_disable_property(self, propname):
153 if self._cfgimpl_parent != None:
154 raise MethodCallError("this method root_hide() shall not be"
155 "used with non-root Config() object")
156 if self._cfgimpl_has_property(propname):
157 self._cfgimpl_properties.remove(propname)
159 def cfgimpl_non_mandatory(self):
160 if self._cfgimpl_parent != None:
161 raise MethodCallError("this method root_mandatory machin() shall not be"
162 "used with non-root Confit() object")
163 rootconfig = self._cfgimpl_get_toplevel()
164 rootconfig._cfgimpl_mandatory = False
166 # ____________________________________________________________
167 def __setattr__(self, name, value):
168 if name.startswith('_cfgimpl_'):
169 self.__dict__[name] = value
172 homeconfig, name = self._cfgimpl_get_home_by_path(name)
173 return setattr(homeconfig, name, value)
174 if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
175 self._validate(name, getattr(self._cfgimpl_descr, name))
176 self.setoption(name, value, self._cfgimpl_owner)
178 def _validate(self, name, opt_or_descr):
179 apply_requires(opt_or_descr, self)
180 if not isinstance(opt_or_descr, Option) and \
181 not isinstance(opt_or_descr, OptionDescription):
182 raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
183 properties = opt_or_descr.properties
184 for proper in properties:
185 if not self._cfgimpl_toplevel._cfgimpl_has_property(proper):
186 properties.remove(proper)
188 raise PropertiesOptionError("trying to access"
189 " to an option named: {0} with properties"
190 " {1}".format(name, str(properties)),
193 def _is_empty(self, opt):
194 if (not opt.is_multi() and self._cfgimpl_values[opt._name] == None) or \
195 (opt.is_multi() and (self._cfgimpl_values[opt._name] == [] or \
196 None in self._cfgimpl_values[opt._name])):
200 def __getattr__(self, name):
201 # attribute access by passing a path,
202 # for instance getattr(self, "creole.general.family.adresse_ip_eth0")
204 homeconfig, name = self._cfgimpl_get_home_by_path(name)
205 return getattr(homeconfig, name)
206 opt_or_descr = getattr(self._cfgimpl_descr, name)
208 if type(opt_or_descr) == SymLinkOption:
209 return getattr(self, opt_or_descr.path)
210 if name not in self._cfgimpl_values:
211 raise AttributeError("%s object has no attribute %s" %
212 (self.__class__, name))
213 self._validate(name, opt_or_descr)
215 if name.startswith('_cfgimpl_'):
216 # if it were in __dict__ it would have been found already
217 return self.__dict__[name]
218 raise AttributeError("%s object has no attribute %s" %
219 (self.__class__, name))
220 if not isinstance(opt_or_descr, OptionDescription):
221 # options with callbacks (fill or auto)
222 if opt_or_descr.has_callback():
223 value = self._cfgimpl_values[name]
224 if (not opt_or_descr.is_frozen() or \
225 not opt_or_descr.is_forced_on_freeze()) and value != None:
226 if opt_or_descr.is_multi():
227 if None not in value:
231 result = carry_out_calculation(name,
232 callback=opt_or_descr.getcallback(),
233 callback_params=opt_or_descr.getcallback_params(),
234 config=self._cfgimpl_get_toplevel())
235 # this result **shall not** be a list
236 # for example, [1, 2, 3, None] -> [1, 2, 3, result]
237 if isinstance(result, list):
238 raise ConfigError('invalid calculated value returned'
239 ' for option {0} : shall not be a list'.format(name))
240 if result != None and not opt_or_descr._validate(result):
241 raise ConfigError('invalid calculated value returned'
242 ' for option {0}'.format(name))
243 if opt_or_descr.is_multi():
245 _result = Multi([result], value.config, value.child)
247 _result = Multi([], value.config, value.child)
257 homeconfig = self._cfgimpl_get_toplevel()
258 mandatory = homeconfig._cfgimpl_mandatory
259 if opt_or_descr.is_mandatory() and mandatory:
260 if self._is_empty(opt_or_descr) and \
261 opt_or_descr.is_empty_by_default():
262 raise MandatoryError("option: {0} is mandatory "
263 "and shall have a value".format(name))
264 # frozen and force default
265 if opt_or_descr.is_forced_on_freeze():
266 return opt_or_descr.getdefault()
268 return self._cfgimpl_values[name]
271 #from_type = dir(type(self))
272 from_dict = list(self.__dict__)
273 extras = list(self._cfgimpl_values)
274 return sorted(set(extras + from_dict))
276 def unwrap_from_name(self, name):
277 # didn't have to stoop so low: `self.get()` must be the proper method
278 # **and it is slow**: it recursively searches into the namespaces
279 paths = self.getpaths(allpaths=True)
280 opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
281 all_paths = [p.split(".") for p in self.getpaths()]
282 for pth in all_paths:
284 return opts[".".join(pth)]
285 raise NotFoundError("name: {0} not found".format(name))
287 def unwrap_from_path(self, path):
288 # didn't have to stoop so low, `geattr(self, path)` is much better
289 # **fast**: finds the option directly in the appropriate namespace
291 homeconfig, path = self._cfgimpl_get_home_by_path(path)
292 return getattr(homeconfig._cfgimpl_descr, path)
293 return getattr(self._cfgimpl_descr, path)
295 def __delattr__(self, name):
296 # if you use delattr you are responsible for all bad things happening
297 if name.startswith('_cfgimpl_'):
298 del self.__dict__[name]
300 self._cfgimpl_value_owners[name] = 'default'
301 opt = getattr(self._cfgimpl_descr, name)
302 if isinstance(opt, OptionDescription):
303 raise AttributeError("can't option subgroup")
304 self._cfgimpl_values[name] = getattr(opt, 'default', None)
306 def setoption(self, name, value, who=None):
307 #who is **not necessarily** a owner, because it cannot be a list
308 #FIXME : sortir le setoption pour les multi, ca ne devrait pas être la
309 child = getattr(self._cfgimpl_descr, name)
312 newowner = [self._cfgimpl_owner for i in range(len(value))]
314 newowner = self._cfgimpl_owner
316 if type(child) != SymLinkOption:
318 if type(value) != Multi:
319 if type(value) == list:
320 value = Multi(value, self, child)
322 raise ConfigError("invalid value for option:"
323 " {0} that is set to multi".format(name))
324 newowner = [who for i in range(len(value))]
327 if type(child) != SymLinkOption:
328 if child.has_callback() and who=='default':
329 raise TypeError("trying to set a value to an option "
330 "wich has a callback: {0}".format(name))
331 child.setoption(self, value, who)
332 if (value is None and who != 'default' and not child.is_multi()):
333 child.setowner(self, 'default')
334 self._cfgimpl_values[name] = copy(child.getdefault())
335 elif (value == [] and who != 'default' and child.is_multi()):
336 child.setowner(self, ['default' for i in range(len(child.getdefault()))])
337 self._cfgimpl_values[name] = Multi(copy(child.getdefault()),
338 config=self, child=child)
340 child.setowner(self, newowner)
342 homeconfig = self._cfgimpl_get_toplevel()
343 child.setoption(homeconfig, value, who)
345 def set(self, **kwargs):
346 all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
347 for key, value in kwargs.iteritems():
348 key_p = key.split('.')
349 candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
350 if len(candidates) == 1:
351 name = '.'.join(candidates[0])
352 homeconfig, name = self._cfgimpl_get_home_by_path(name)
354 getattr(homeconfig, name)
355 except MandatoryError:
358 raise e # HiddenOptionError or DisabledOptionError
359 homeconfig.setoption(name, value, self._cfgimpl_owner)
360 elif len(candidates) > 1:
361 raise AmbigousOptionError(
362 'more than one option that ends with %s' % (key, ))
364 raise NoMatchingOptionFound(
365 'there is no option that matches %s'
366 ' or the option is hidden or disabled'% (key, ))
369 paths = self.getpaths(allpaths=True)
372 pathname = path.split('.')[-1]
375 value = getattr(self, path)
379 raise NotFoundError("option {0} not found in config".format(name))
381 def _cfgimpl_get_home_by_path(self, path):
382 """returns tuple (config, name)"""
383 path = path.split('.')
385 for step in path[:-1]:
386 self = getattr(self, step)
387 return self, path[-1]
389 def _cfgimpl_get_toplevel(self):
390 while self._cfgimpl_parent is not None:
391 self = self._cfgimpl_parent
394 def _cfgimpl_get_path(self):
397 while obj._cfgimpl_parent is not None:
398 subpath.insert(0, obj._cfgimpl_descr._name)
399 obj = obj._cfgimpl_parent
400 return ".".join(subpath)
403 def cfgimpl_previous_value(self, path):
404 home, name = self._cfgimpl_get_home_by_path(path)
405 return home._cfgimpl_previous_values[name]
407 def get_previous_value(self, name):
408 return self._cfgimpl_previous_values[name]
410 def add_warning(self, warning):
411 self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
413 def get_warnings(self):
414 return self._cfgimpl_get_toplevel()._cfgimpl_warnings
415 # ____________________________________________________________
416 # freeze and read-write statuses
417 def cfgimpl_freeze(self):
418 rootconfig = self._cfgimpl_get_toplevel()
419 rootconfig._cfgimpl_frozen = True
420 self._cfgimpl_frozen = True
422 def cfgimpl_unfreeze(self):
423 rootconfig = self._cfgimpl_get_toplevel()
424 rootconfig._cfgimpl_frozen = False
425 self._cfgimpl_frozen = False
428 # it should be the same value as self._cfgimpl_frozen...
429 rootconfig = self._cfgimpl_get_toplevel()
430 return rootconfig._cfgimpl_frozen
432 def is_mandatory(self):
433 rootconfig = self._cfgimpl_get_toplevel()
434 return rootconfig._cfgimpl_mandatory
436 def cfgimpl_read_only(self):
437 # hung up on freeze, hidden and disabled concepts
438 self.cfgimpl_freeze()
439 rootconfig = self._cfgimpl_get_toplevel()
440 rootconfig.cfgimpl_disable_property('hidden')
441 rootconfig.cfgimpl_enable_property('disabled')
442 rootconfig._cfgimpl_mandatory = True
444 def cfgimpl_read_write(self):
445 # hung up on freeze, hidden and disabled concepts
446 self.cfgimpl_unfreeze()
447 rootconfig = self._cfgimpl_get_toplevel()
448 rootconfig.cfgimpl_enable_property('hidden')
449 rootconfig.cfgimpl_enable_property('disabled')
450 rootconfig._cfgimpl_mandatory = False
451 # ____________________________________________________________
453 return self._cfgimpl_descr.getkey(self)
456 return hash(self.getkey())
458 def __eq__(self, other):
459 return self.getkey() == other.getkey()
461 def __ne__(self, other):
462 return not self == other
465 # iteration only on Options (not OptionDescriptions)
466 for child in self._cfgimpl_descr._children:
467 if isinstance(child, Option):
469 yield child._name, getattr(self, child._name)
471 pass # option with properties
473 def iter_groups(self, group_type=None):
474 "iteration on OptionDescriptions"
475 if group_type == None:
478 if group_type not in group_types:
479 raise TypeError("Unknown group_type: {0}".format(group_type))
480 groups = [group_type]
481 for child in self._cfgimpl_descr._children:
482 if isinstance(child, OptionDescription):
484 if child.get_group_type() in groups:
485 yield child._name, getattr(self, child._name)
487 pass # hidden, disabled option
489 def __str__(self, indent=""):
491 children = [(child._name, child)
492 for child in self._cfgimpl_descr._children]
494 for name, child in children:
495 if self._cfgimpl_value_owners.get(name, None) == 'default':
497 value = getattr(self, name)
498 if isinstance(value, Config):
499 substr = value.__str__(indent + " ")
501 substr = "%s %s = %s" % (indent, name, value)
504 if indent and not lines:
505 return '' # hide subgroups with all default values
506 lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,))
507 return '\n'.join(lines)
509 def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
510 """returns a list of all paths in self, recursively, taking care of
511 the context of properties (hidden/disabled)
514 for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
516 value = getattr(self, path)
518 except MandatoryError:
519 if mandatory or allpaths:
521 except PropertiesOptionError:
523 paths.append(path) # hidden or disabled or mandatory option added
528 def make_dict(config, flatten=False):
529 paths = config.getpaths()
533 pathname = path.split('.')[-1]
537 value = getattr(config, path)
538 pathsvalues.append((pathname, value))
540 pass # this just a hidden or disabled option
541 options = dict(pathsvalues)
544 def mandatory_warnings(config):
545 mandatory = config._cfgimpl_get_toplevel()._cfgimpl_mandatory
546 config._cfgimpl_get_toplevel()._cfgimpl_mandatory = True
547 for path in config._cfgimpl_descr.getpaths(include_groups=True):
549 value = getattr(config, path)
550 except MandatoryError:
552 except PropertiesOptionError:
554 config._cfgimpl_get_toplevel()._cfgimpl_mandatory = mandatory