1 # -*- coding: utf-8 -*-
2 "pretty small and local configuration management tool"
3 # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # The original `Config` design model is unproudly borrowed from
20 # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
21 # the whole pypy projet is under MIT licence
22 # ____________________________________________________________
24 from tiramisu.error import (HiddenOptionError, ConfigError, NotFoundError,
25 AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
26 SpecialOwnersError, MandatoryError, MethodCallError,
27 DisabledOptionError, ModeOptionError)
28 from tiramisu.option import (OptionDescription, Option, SymLinkOption, group_types,
29 Multi, apply_requires, modes)
30 from tiramisu.autolib import special_owners, special_owner_factory
31 # ______________________________________________________________________
32 # generic owner. 'default' is the general config owner after init time
33 default_owner = 'user'
34 # ____________________________________________________________
36 _cfgimpl_hidden = True
37 _cfgimpl_disabled = True
38 _cfgimpl_mandatory = True
39 _cfgimpl_frozen = False
40 _cfgimpl_owner = default_owner
41 _cfgimpl_toplevel = None
42 _cfgimpl_mode = 'normal'
44 def __init__(self, descr, parent=None, **overrides):
45 self._cfgimpl_descr = descr
46 self._cfgimpl_value_owners = {}
47 self._cfgimpl_parent = parent
48 # `Config()` indeed takes care of the `Option()`'s values
49 self._cfgimpl_values = {}
50 self._cfgimpl_previous_values = {}
51 # XXX warnings are a great idea, let's make up a better use of it
52 self._cfgimpl_warnings = []
53 self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
54 # `freeze()` allows us to carry out this calculation again if necessary
55 self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen
56 self._cfgimpl_build(overrides)
58 def _validate_duplicates(self, children):
61 if dup._name not in duplicates:
62 duplicates.append(dup._name)
64 raise ConflictConfigError('duplicate option name: '
65 '{0}'.format(dup._name))
67 def _cfgimpl_build(self, overrides):
68 self._validate_duplicates(self._cfgimpl_descr._children)
69 for child in self._cfgimpl_descr._children:
70 if isinstance(child, Option):
72 childdef = Multi(copy(child.getdefault()), config=self,
74 self._cfgimpl_values[child._name] = childdef
75 self._cfgimpl_previous_values[child._name] = list(childdef)
77 childdef = child.getdefault()
78 self._cfgimpl_values[child._name] = childdef
79 self._cfgimpl_previous_values[child._name] = childdef
80 if child.getcallback() is not None:
81 if child._is_hidden():
82 self._cfgimpl_value_owners[child._name] = 'auto'
84 self._cfgimpl_value_owners[child._name] = 'fill'
87 self._cfgimpl_value_owners[child._name] = ['default' \
88 for i in range(len(child.getdefault() ))]
90 self._cfgimpl_value_owners[child._name] = 'default'
91 elif isinstance(child, OptionDescription):
92 self._validate_duplicates(child._children)
93 self._cfgimpl_values[child._name] = Config(child, parent=self)
94 self.override(overrides)
96 def cfgimpl_update(self):
97 "dynamically adds `Option()` or `OptionDescription()`"
98 # Nothing is static. Everything evolve.
99 # FIXME this is an update for new options in the schema only
100 # see the update_child() method of the descr object
101 for child in self._cfgimpl_descr._children:
102 if isinstance(child, Option):
103 if child._name not in self._cfgimpl_values:
105 self._cfgimpl_values[child._name] = Multi(
106 copy(child.getdefault()), config=self, child=child)
107 self._cfgimpl_value_owners[child._name] = ['default' \
108 for i in range(len(child.getdefault() ))]
110 self._cfgimpl_values[child._name] = copy(child.getdefault())
111 self._cfgimpl_value_owners[child._name] = 'default'
112 elif isinstance(child, OptionDescription):
113 if child._name not in self._cfgimpl_values:
114 self._cfgimpl_values[child._name] = Config(child, parent=self)
116 def override(self, overrides):
117 for name, value in overrides.iteritems():
118 homeconfig, name = self._cfgimpl_get_home_by_path(name)
119 # if there are special_owners, impossible to override
120 if homeconfig._cfgimpl_value_owners[name] in special_owners:
121 raise SpecialOwnersError("cannot override option: {0} because "
122 "of its special owner".format(name))
123 homeconfig.setoption(name, value, 'default')
125 def cfgimpl_set_owner(self, owner):
126 self._cfgimpl_owner = owner
127 for child in self._cfgimpl_descr._children:
128 if isinstance(child, OptionDescription):
129 self._cfgimpl_values[child._name].cfgimpl_set_owner(owner)
130 # ____________________________________________________________
131 def cfgimpl_hide(self):
132 if self._cfgimpl_parent != None:
133 raise MethodCallError("this method root_hide() shall not be"
134 "used with non-root Config() object")
135 rootconfig = self._cfgimpl_get_toplevel()
136 rootconfig._cfgimpl_hidden = True
138 def cfgimpl_show(self):
139 if self._cfgimpl_parent != None:
140 raise MethodCallError("this method root_hide() shall not be"
141 "used with non-root Config() object")
142 rootconfig = self._cfgimpl_get_toplevel()
143 rootconfig._cfgimpl_hidden = False
144 # ____________________________________________________________
145 def cfgimpl_disable(self):
146 if self._cfgimpl_parent != None:
147 raise MethodCallError("this method root_hide() shall not be"
148 "used with non-root Confit() object")
149 rootconfig = self._cfgimpl_get_toplevel()
150 rootconfig._cfgimpl_disabled = True
152 def cfgimpl_enable(self):
153 if self._cfgimpl_parent != None:
154 raise MethodCallError("this method root_hide() shall not be"
155 "used with non-root Confit() object")
156 rootconfig = self._cfgimpl_get_toplevel()
157 rootconfig._cfgimpl_disabled = False
158 # ____________________________________________________________
159 def __setattr__(self, name, value):
161 homeconfig, name = self._cfgimpl_get_home_by_path(name)
162 return setattr(homeconfig, name, value)
164 if name.startswith('_cfgimpl_'):
165 self.__dict__[name] = value
167 if self._cfgimpl_frozen and getattr(self, name) != value:
168 raise TypeError("trying to change a value in a frozen config"
169 ": {0} {1}".format(name, value))
170 if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
171 self._validate(name, getattr(self._cfgimpl_descr, name))
172 self.setoption(name, value, self._cfgimpl_owner)
174 def _validate(self, name, opt_or_descr):
175 apply_requires(opt_or_descr, self)
176 if not type(opt_or_descr) == OptionDescription:
178 if self._cfgimpl_toplevel._cfgimpl_hidden and \
179 (opt_or_descr._is_hidden() or self._cfgimpl_descr._is_hidden()):
180 raise HiddenOptionError("trying to access to a hidden option:"
183 if self._cfgimpl_toplevel._cfgimpl_disabled and \
184 (opt_or_descr._is_disabled() or self._cfgimpl_descr._is_disabled()):
185 raise DisabledOptionError("this option is disabled:"
188 # XXX currently doesn't look at the group, is it really necessary ?
189 if self._cfgimpl_toplevel._cfgimpl_mode != 'normal':
190 if opt_or_descr.get_mode() != 'normal':
191 raise ModeOptionError("this option's mode is not normal:"
194 def __getattr__(self, name):
195 # attribute access by passing a path,
196 # for instance getattr(self, "creole.general.family.adresse_ip_eth0")
198 homeconfig, name = self._cfgimpl_get_home_by_path(name)
199 return getattr(homeconfig, name)
200 opt_or_descr = getattr(self._cfgimpl_descr, name)
202 if type(opt_or_descr) == SymLinkOption:
203 return getattr(self, opt_or_descr.path)
204 self._validate(name, opt_or_descr)
206 if name.startswith('_cfgimpl_'):
207 # if it were in __dict__ it would have been found already
208 return self.__dict__[name]
209 raise AttributeError("%s object has no attribute %s" %
210 (self.__class__, name))
211 if name not in self._cfgimpl_values:
212 raise AttributeError("%s object has no attribute %s" %
213 (self.__class__, name))
214 if name in self._cfgimpl_value_owners:
215 owner = self._cfgimpl_value_owners[name]
216 if owner in special_owners:
217 value = self._cfgimpl_values[name]
219 if opt_or_descr.is_multi():
220 if owner == 'fill' and None not in value:
223 if owner == 'fill' and value != None:
225 result = special_owner_factory(name, owner,
227 callback=opt_or_descr.getcallback(),
228 callback_params=opt_or_descr.getcallback_params(),
229 config=self._cfgimpl_get_toplevel())
230 # this result **shall not** be a list
231 # for example, [1, 2, 3, None] -> [1, 2, 3, result]
232 if isinstance(result, list):
233 raise ConfigError('invalid calculated value returned'
234 ' for option {0} : shall not be a list'.format(name))
235 if result != None and not opt_or_descr._validate(result):
236 raise ConfigError('invalid calculated value returned'
237 ' for option {0}'.format(name))
238 if opt_or_descr.is_multi():
240 _result = Multi([result], value.config, value.child)
242 _result = Multi([], value.config, value.child)
251 if not isinstance(opt_or_descr, OptionDescription):
252 homeconfig = self._cfgimpl_get_toplevel()
253 mandatory = homeconfig._cfgimpl_mandatory
254 if opt_or_descr.is_mandatory() and mandatory:
255 if self._cfgimpl_values[name] == None\
256 and opt_or_descr.getdefault() == None:
257 raise MandatoryError("option: {0} is mandatory "
258 "and shall have a value".format(name))
259 return self._cfgimpl_values[name]
262 #from_type = dir(type(self))
263 from_dict = list(self.__dict__)
264 extras = list(self._cfgimpl_values)
265 return sorted(set(extras + from_dict))
267 def unwrap_from_name(self, name):
268 # didn't have to stoop so low: `self.get()` must be the proper method
269 # **and it is slow**: it recursively searches into the namespaces
270 paths = self.getpaths(allpaths=True)
271 opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
272 all_paths = [p.split(".") for p in self.getpaths()]
273 for pth in all_paths:
275 return opts[".".join(pth)]
276 raise NotFoundError("name: {0} not found".format(name))
278 def unwrap_from_path(self, path):
279 # didn't have to stoop so low, `geattr(self, path)` is much better
280 # **fast**: finds the option directly in the appropriate namespace
282 homeconfig, path = self._cfgimpl_get_home_by_path(path)
283 return getattr(homeconfig._cfgimpl_descr, path)
284 return getattr(self._cfgimpl_descr, path)
286 def __delattr__(self, name):
287 # if you use delattr you are responsible for all bad things happening
288 if name.startswith('_cfgimpl_'):
289 del self.__dict__[name]
291 self._cfgimpl_value_owners[name] = 'default'
292 opt = getattr(self._cfgimpl_descr, name)
293 if isinstance(opt, OptionDescription):
294 raise AttributeError("can't option subgroup")
295 self._cfgimpl_values[name] = getattr(opt, 'default', None)
297 def setoption(self, name, value, who=None):
298 #who is **not necessarily** a owner, because it cannot be a list
299 #FIXME : sortir le setoption pour les multi, ca ne devrait pas être la
300 child = getattr(self._cfgimpl_descr, name)
303 newowner = [self._cfgimpl_owner for i in range(len(value))]
305 newowner = self._cfgimpl_owner
307 if type(child) != SymLinkOption:
309 if type(value) != Multi:
310 if type(value) == list:
311 value = Multi(value, self, child)
313 raise ConfigError("invalid value for option:"
314 " {0} that is set to multi".format(name))
315 newowner = [who for i in range(len(value))]
318 if type(child) != SymLinkOption:
319 if name not in self._cfgimpl_values:
320 raise AttributeError('unknown option %s' % (name,))
321 # special owners, a value with a owner *auto* cannot be changed
322 oldowner = self._cfgimpl_value_owners[child._name]
323 if oldowner == 'auto':
325 raise ConflictConfigError('cannot override value to %s for '
326 'option %s' % (value, name))
328 oldvalue = getattr(self, name)
329 if oldvalue == value: #or who in ("default",):
331 child.setoption(self, value, who)
332 # if the value owner is 'auto', set the option to hidden
334 if not child._is_hidden():
336 if (value is None and who != 'default' and not child.is_multi()):
337 child.setowner(self, 'default')
338 self._cfgimpl_values[name] = copy(child.getdefault())
339 elif (value == [] and who != 'default' and child.is_multi()):
340 child.setowner(self, ['default' for i in range(len(child.getdefault()))])
341 self._cfgimpl_values[name] = Multi(copy(child.getdefault()),
342 config=self, child=child)
344 child.setowner(self, newowner)
346 homeconfig = self._cfgimpl_get_toplevel()
347 child.setoption(homeconfig, value, who)
349 def set(self, **kwargs):
350 all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
351 for key, value in kwargs.iteritems():
352 key_p = key.split('.')
353 candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
354 if len(candidates) == 1:
355 name = '.'.join(candidates[0])
356 homeconfig, name = self._cfgimpl_get_home_by_path(name)
358 getattr(homeconfig, name)
359 except MandatoryError:
362 raise e # HiddenOptionError or DisabledOptionError
363 homeconfig.setoption(name, value, self._cfgimpl_owner)
364 elif len(candidates) > 1:
365 raise AmbigousOptionError(
366 'more than one option that ends with %s' % (key, ))
368 raise NoMatchingOptionFound(
369 'there is no option that matches %s'
370 ' or the option is hidden or disabled'% (key, ))
373 paths = self.getpaths(allpaths=True)
376 pathname = path.split('.')[-1]
379 value = getattr(self, path)
383 raise NotFoundError("option {0} not found in config".format(name))
385 def _cfgimpl_get_home_by_path(self, path):
386 """returns tuple (config, name)"""
387 path = path.split('.')
389 for step in path[:-1]:
390 self = getattr(self, step)
391 return self, path[-1]
393 def _cfgimpl_get_toplevel(self):
394 while self._cfgimpl_parent is not None:
395 self = self._cfgimpl_parent
398 def cfgimpl_previous_value(self, path):
399 home, name = self._cfgimpl_get_home_by_path(path)
400 return home._cfgimpl_previous_values[name]
402 def get_previous_value(self, name):
403 return self._cfgimpl_previous_values[name]
405 def add_warning(self, warning):
406 self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
408 def get_warnings(self):
409 return self._cfgimpl_get_toplevel()._cfgimpl_warnings
410 # ____________________________________________________________
411 # freeze and read-write statuses
412 def cfgimpl_freeze(self):
413 rootconfig = self._cfgimpl_get_toplevel()
414 rootconfig._cfgimpl_frozen = True
415 self._cfgimpl_frozen = True
417 def cfgimpl_unfreeze(self):
418 rootconfig = self._cfgimpl_get_toplevel()
419 rootconfig._cfgimpl_frozen = False
420 self._cfgimpl_frozen = False
423 # it should be the same value as self._cfgimpl_frozen...
424 rootconfig = self._cfgimpl_get_toplevel()
425 return rootconfig.__dict__['_cfgimpl_frozen']
427 def cfgimpl_read_only(self):
428 # hung up on freeze, hidden and disabled concepts
429 self.cfgimpl_freeze()
430 rootconfig = self._cfgimpl_get_toplevel()
431 rootconfig._cfgimpl_hidden = False
432 rootconfig._cfgimpl_disabled = True
433 rootconfig._cfgimpl_mandatory = True
435 def cfgimpl_set_mode(self, mode):
436 # normal or expert mode
437 rootconfig = self._cfgimpl_get_toplevel()
438 if mode not in modes:
439 raise ConfigError("mode {0} not available".format(mode))
440 rootconfig._cfgimpl_mode = mode
442 def cfgimpl_read_write(self):
443 # hung up on freeze, hidden and disabled concepts
444 self.cfgimpl_unfreeze()
445 rootconfig = self._cfgimpl_get_toplevel()
446 rootconfig._cfgimpl_hidden = True
447 rootconfig._cfgimpl_disabled = True
448 rootconfig._cfgimpl_mandatory = False
449 # ____________________________________________________________
451 return self._cfgimpl_descr.getkey(self)
454 return hash(self.getkey())
456 def __eq__(self, other):
457 return self.getkey() == other.getkey()
459 def __ne__(self, other):
460 return not self == other
463 # iteration only on Options (not OptionDescriptions)
464 for child in self._cfgimpl_descr._children:
465 if isinstance(child, Option):
467 yield child._name, getattr(self, child._name)
469 pass # hidden, disabled option group
471 def iter_groups(self, group_type=None):
472 "iteration on OptionDescriptions"
473 if group_type == None:
476 if group_type not in group_types:
477 raise TypeError("Unknown group_type: {0}".format(group_type))
478 groups = [group_type]
479 for child in self._cfgimpl_descr._children:
480 if isinstance(child, OptionDescription):
482 if child.get_group_type() in groups:
483 yield child._name, getattr(self, child._name)
485 pass # hidden, disabled option
487 def __str__(self, indent=""):
489 children = [(child._name, child)
490 for child in self._cfgimpl_descr._children]
492 for name, child in children:
493 if self._cfgimpl_value_owners.get(name, None) == 'default':
495 value = getattr(self, name)
496 if isinstance(value, Config):
497 substr = value.__str__(indent + " ")
499 substr = "%s %s = %s" % (indent, name, value)
502 if indent and not lines:
503 return '' # hide subgroups with all default values
504 lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,))
505 return '\n'.join(lines)
507 def getpaths(self, include_groups=False, allpaths=False):
508 """returns a list of all paths in self, recursively, taking care of
509 the context (hidden/disabled)
512 for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
514 value = getattr(self, path)
517 pass # hidden or disabled option
519 paths.append(path) # hidden or disabled option added
524 def make_dict(config, flatten=False):
525 paths = config.getpaths()
529 pathname = path.split('.')[-1]
533 value = getattr(config, path)
534 pathsvalues.append((pathname, value))
536 pass # this just a hidden or disabled option
537 options = dict(pathsvalues)
540 def mandatory_warnings(config):
541 for path in config.getpaths():
543 value = getattr(config, path)
544 except MandatoryError: