tests on frozen and None value
[tiramisu.git] / tiramisu / config.py
1 # -*- coding: utf-8 -*-
2 "pretty small and local configuration management tool"
3 # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 #
19 # The original `Config` design model is unproudly borrowed from 
20 # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
21 # the whole pypy projet is under MIT licence
22 # ____________________________________________________________
23 from copy import copy
24 from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError, 
25     AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound, 
26     MandatoryError, MethodCallError)
27 from tiramisu.option import (OptionDescription, Option, SymLinkOption, 
28     group_types, Multi, apply_requires)
29 from tiramisu.autolib import carry_out_calculation
30
31 # ______________________________________________________________________
32 # generic owner. 'default' is the general config owner after init time
33 default_owner = 'user'
34 # ____________________________________________________________
35 class Config(object):
36     _cfgimpl_properties = ['hidden', 'disabled']
37     _cfgimpl_mandatory = True
38     _cfgimpl_frozen = True
39     _cfgimpl_owner = default_owner
40     _cfgimpl_toplevel = None
41 # TODO implement unicity by name
42 #    _cfgimpl_unique_names = True
43     
44     def __init__(self, descr, parent=None, **overrides):
45         self._cfgimpl_descr = descr
46         self._cfgimpl_value_owners = {}
47         self._cfgimpl_parent = parent
48         # `Config()` indeed takes care of the `Option()`'s values
49         self._cfgimpl_values = {}
50         self._cfgimpl_previous_values = {}
51         # XXX warnings are a great idea, let's make up a better use of it 
52         self._cfgimpl_warnings = []
53         self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
54         # `freeze()` allows us to carry out this calculation again if necessary 
55         self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen 
56         self._cfgimpl_build(overrides)
57
58     def _validate_duplicates(self, children):
59         duplicates = []
60         for dup in children:
61             if dup._name not in duplicates:
62                 duplicates.append(dup._name)
63             else:
64                 raise ConflictConfigError('duplicate option name: ' 
65                     '{0}'.format(dup._name))
66
67 # TODO implement unicity by name
68 #    def _validate_duplicates_for_names(self, children):
69 #        "validates duplicates names agains the whole config"
70 #        rootconfig = self._cfgimpl_get_toplevel()
71 #        if self._cfgimpl_unique_names:
72 #            for dup in children:
73 #                try:
74 #                    print dup._name
75 #                    try:
76 #                        print rootconfig.get(dup._name)
77 #                    except AttributeError:
78 #                        pass
79 #                    raise NotFoundError
80 #                    #rootconfig.get(dup._name)
81 #                except NotFoundError:
82 #                    pass # no identical names, it's fine
83 #                else:
84 #                    raise ConflictConfigError('duplicate option name: ' 
85 #                        '{0}'.format(dup._name))
86         
87     def _cfgimpl_build(self, overrides):
88         self._validate_duplicates(self._cfgimpl_descr._children)
89         for child in self._cfgimpl_descr._children:
90             if isinstance(child, Option):
91                 if child.is_multi():
92                     childdef = Multi(copy(child.getdefault()), config=self, 
93                                      child=child)
94                     self._cfgimpl_values[child._name] = childdef
95                     self._cfgimpl_previous_values[child._name] = list(childdef)
96                     self._cfgimpl_value_owners[child._name] = ['default' \
97                         for i in range(len(child.getdefault() ))]
98                 else:
99                     childdef = child.getdefault()
100                     self._cfgimpl_values[child._name] = childdef
101                     self._cfgimpl_previous_values[child._name] = childdef 
102                     self._cfgimpl_value_owners[child._name] = 'default'
103             elif isinstance(child, OptionDescription):
104                 self._validate_duplicates(child._children)
105                 self._cfgimpl_values[child._name] = Config(child, parent=self)
106         self.override(overrides)
107
108     def cfgimpl_update(self):
109         "dynamically adds `Option()` or `OptionDescription()`"
110         # Nothing is static. Everything evolve.
111         # FIXME this is an update for new options in the schema only 
112         # see the update_child() method of the descr object 
113         for child in self._cfgimpl_descr._children:
114             if isinstance(child, Option):
115                 if child._name not in self._cfgimpl_values:
116                     if child.is_multi():
117                         self._cfgimpl_values[child._name] = Multi(
118                                 copy(child.getdefault()), config=self, child=child)
119                         self._cfgimpl_value_owners[child._name] = ['default' \
120                                 for i in range(len(child.getdefault() ))]
121                     else:
122                         self._cfgimpl_values[child._name] = copy(child.getdefault())
123                         self._cfgimpl_value_owners[child._name] = 'default'
124             elif isinstance(child, OptionDescription):
125                 if child._name not in self._cfgimpl_values:
126                     self._cfgimpl_values[child._name] = Config(child, parent=self)
127
128     def override(self, overrides):
129         for name, value in overrides.iteritems():
130             homeconfig, name = self._cfgimpl_get_home_by_path(name)
131             homeconfig.setoption(name, value, 'default')
132
133     def cfgimpl_set_owner(self, owner):
134         self._cfgimpl_owner = owner
135         for child in self._cfgimpl_descr._children:
136             if isinstance(child, OptionDescription):
137                 self._cfgimpl_values[child._name].cfgimpl_set_owner(owner)
138     # ____________________________________________________________
139     def _cfgimpl_has_properties(self):
140         return bool(len(self._cfgimpl_properties))
141
142     def _cfgimpl_has_property(self, propname):
143         return propname in self._cfgimpl_properties
144
145     def cfgimpl_enable_property(self, propname):
146         if self._cfgimpl_parent != None:
147             raise MethodCallError("this method root_hide() shall not be"
148                                            "used with non-root Config() object") 
149         if propname not in self._cfgimpl_properties:
150             self._cfgimpl_properties.append(propname)
151     
152     def cfgimpl_disable_property(self, propname):
153         if self._cfgimpl_parent != None:
154             raise MethodCallError("this method root_hide() shall not be"
155                                            "used with non-root Config() object") 
156         if self._cfgimpl_has_property(propname):
157             self._cfgimpl_properties.remove(propname)
158
159     def cfgimpl_non_mandatory(self):
160         if self._cfgimpl_parent != None:
161             raise MethodCallError("this method root_mandatory machin() shall not be"
162                                            "used with non-root Confit() object") 
163         rootconfig = self._cfgimpl_get_toplevel()
164         rootconfig._cfgimpl_mandatory = False
165
166     # ____________________________________________________________
167     def __setattr__(self, name, value):
168         if name.startswith('_cfgimpl_'):
169             self.__dict__[name] = value
170             return
171         if '.' in name:
172             homeconfig, name = self._cfgimpl_get_home_by_path(name)
173             return setattr(homeconfig, name, value)
174         if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
175             self._validate(name, getattr(self._cfgimpl_descr, name))
176         self.setoption(name, value, self._cfgimpl_owner)
177         
178     def _validate(self, name, opt_or_descr):
179         apply_requires(opt_or_descr, self) 
180         if not isinstance(opt_or_descr, Option) and \
181                 not isinstance(opt_or_descr, OptionDescription):
182             raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
183         properties = opt_or_descr.properties
184         for proper in properties:
185             if not self._cfgimpl_toplevel._cfgimpl_has_property(proper):
186                 properties.remove(proper)
187         if properties != []:
188             raise PropertiesOptionError("trying to access"
189                     " to an option named: {0} with properties"
190                     " {1}".format(name, str(properties)), 
191                     properties)
192
193     def _is_empty(self, opt):
194         if (not opt.is_multi() and self._cfgimpl_values[opt._name] == None) or \
195             (opt.is_multi() and (self._cfgimpl_values[opt._name] == [] or \
196                 None in self._cfgimpl_values[opt._name])):
197             return True
198         return False
199
200     def __getattr__(self, name):
201         # attribute access by passing a path, 
202         # for instance getattr(self, "creole.general.family.adresse_ip_eth0") 
203         if '.' in name:
204             homeconfig, name = self._cfgimpl_get_home_by_path(name)
205             return getattr(homeconfig, name)
206         opt_or_descr = getattr(self._cfgimpl_descr, name)
207         # symlink options 
208         if type(opt_or_descr) == SymLinkOption:
209             return getattr(self, opt_or_descr.path)
210         if name not in self._cfgimpl_values:
211             raise AttributeError("%s object has no attribute %s" %
212                                  (self.__class__, name))
213         self._validate(name, opt_or_descr)
214         # special attributes
215         if name.startswith('_cfgimpl_'):
216             # if it were in __dict__ it would have been found already
217             return self.__dict__[name]
218             raise AttributeError("%s object has no attribute %s" %
219                                  (self.__class__, name))
220         if not isinstance(opt_or_descr, OptionDescription):
221             # options with callbacks (fill or auto) 
222             if opt_or_descr.has_callback():
223                 value = self._cfgimpl_values[name]
224                 if (not opt_or_descr.is_frozen() or \
225                         not opt_or_descr.is_forced_on_freeze()) and value != None:
226                     if opt_or_descr.is_multi():
227                         if None not in value:
228                             return value
229                     else:
230                         return value
231                 result = carry_out_calculation(name, 
232                             callback=opt_or_descr.getcallback(),
233                             callback_params=opt_or_descr.getcallback_params(),
234                             config=self._cfgimpl_get_toplevel())
235                 # this result **shall not** be a list 
236                 # for example, [1, 2, 3, None] -> [1, 2, 3, result]
237                 if isinstance(result, list):
238                     raise ConfigError('invalid calculated value returned'
239                         ' for option {0} : shall not be a list'.format(name))
240                 if result != None and not opt_or_descr._validate(result):
241                     raise ConfigError('invalid calculated value returned'
242                         ' for option {0}'.format(name))
243                 if opt_or_descr.is_multi():
244                     if value == []:
245                         _result = Multi([result], value.config, value.child)
246                     else:
247                         _result = Multi([], value.config, value.child)
248                         for val in value:
249                             if val == None:
250                                 val = result
251                             _result.append(val)
252                 else:
253                     _result = result
254                 return _result
255
256             # mandatory options
257             homeconfig = self._cfgimpl_get_toplevel()
258             mandatory = homeconfig._cfgimpl_mandatory
259             if opt_or_descr.is_mandatory() and mandatory:
260                 if self._is_empty(opt_or_descr) and \
261                         opt_or_descr.is_empty_by_default():
262                     raise MandatoryError("option: {0} is mandatory " 
263                                           "and shall have a value".format(name))
264             # frozen and force default
265             if opt_or_descr.is_forced_on_freeze():
266                 return opt_or_descr.getdefault()
267         
268         return self._cfgimpl_values[name]
269                 
270     def __dir__(self):
271         #from_type = dir(type(self))
272         from_dict = list(self.__dict__)
273         extras = list(self._cfgimpl_values)
274         return sorted(set(extras + from_dict))
275
276     def unwrap_from_name(self, name):
277         # didn't have to stoop so low: `self.get()` must be the proper method
278         # **and it is slow**: it recursively searches into the namespaces
279         paths = self.getpaths(allpaths=True)
280         opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
281         all_paths = [p.split(".") for p in self.getpaths()]
282         for pth in all_paths:
283             if name in pth:
284                 return opts[".".join(pth)]
285         raise NotFoundError("name: {0} not found".format(name))
286         
287     def unwrap_from_path(self, path):
288         # didn't have to stoop so low, `geattr(self, path)` is much better
289         # **fast**: finds the option directly in the appropriate namespace
290         if '.' in path:
291             homeconfig, path = self._cfgimpl_get_home_by_path(path)
292             return getattr(homeconfig._cfgimpl_descr, path)
293         return getattr(self._cfgimpl_descr, path)
294                                             
295     def __delattr__(self, name):
296         # if you use delattr you are responsible for all bad things happening
297         if name.startswith('_cfgimpl_'):
298             del self.__dict__[name]
299             return
300         self._cfgimpl_value_owners[name] = 'default'
301         opt = getattr(self._cfgimpl_descr, name)
302         if isinstance(opt, OptionDescription):
303             raise AttributeError("can't option subgroup")
304         self._cfgimpl_values[name] = getattr(opt, 'default', None)
305
306     def setoption(self, name, value, who=None):
307         #who is **not necessarily** a owner, because it cannot be a list
308         #FIXME : sortir le setoption pour les multi, ca ne devrait pas être la
309         child = getattr(self._cfgimpl_descr, name)
310         if who == None:
311             if child.is_multi():
312                 newowner = [self._cfgimpl_owner for i in range(len(value))] 
313             else:
314                 newowner = self._cfgimpl_owner
315         else:
316             if type(child) != SymLinkOption:
317                 if child.is_multi():
318                     if type(value) != Multi:
319                         if type(value) == list:
320                             value = Multi(value, self, child)
321                         else:
322                             raise ConfigError("invalid value for option:"
323                                        " {0} that is set to multi".format(name))
324                     newowner = [who for i in range(len(value))]
325                 else:
326                     newowner = who 
327         if type(child) != SymLinkOption:
328             if child.has_callback() and who=='default':
329                 raise TypeError("trying to set a value to an option "
330                     "wich has a callback: {0}".format(name))
331             child.setoption(self, value, who)
332             if (value is None and who != 'default' and not child.is_multi()):
333                 child.setowner(self, 'default')
334                 self._cfgimpl_values[name] = copy(child.getdefault())
335             elif (value == [] and who != 'default' and child.is_multi()):
336                 child.setowner(self, ['default' for i in range(len(child.getdefault()))])
337                 self._cfgimpl_values[name] = Multi(copy(child.getdefault()),
338                             config=self, child=child)
339             else:         
340                 child.setowner(self, newowner)
341         else:
342             homeconfig = self._cfgimpl_get_toplevel()
343             child.setoption(homeconfig, value, who)
344
345     def set(self, **kwargs):
346         all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
347         for key, value in kwargs.iteritems():
348             key_p = key.split('.')
349             candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
350             if len(candidates) == 1:
351                 name = '.'.join(candidates[0])
352                 homeconfig, name = self._cfgimpl_get_home_by_path(name)
353                 try:
354                     getattr(homeconfig, name)
355                 except MandatoryError:
356                     pass
357                 except Exception, e:
358                     raise e # HiddenOptionError or DisabledOptionError
359                 homeconfig.setoption(name, value, self._cfgimpl_owner)
360             elif len(candidates) > 1:
361                 raise AmbigousOptionError(
362                     'more than one option that ends with %s' % (key, ))
363             else:
364                 raise NoMatchingOptionFound(
365                     'there is no option that matches %s' 
366                     ' or the option is hidden or disabled'% (key, ))
367
368     def get(self, name):
369         paths = self.getpaths(allpaths=True)
370         pathsvalues = []
371         for path in paths:
372             pathname = path.split('.')[-1]
373             if pathname == name:
374                 try:
375                     value = getattr(self, path)            
376                     return value 
377                 except Exception, e:
378                     raise e
379         raise NotFoundError("option {0} not found in config".format(name))                    
380
381     def _cfgimpl_get_home_by_path(self, path):
382         """returns tuple (config, name)"""
383         path = path.split('.')
384         
385         for step in path[:-1]:
386             self = getattr(self, step)
387         return self, path[-1]
388
389     def _cfgimpl_get_toplevel(self):
390         while self._cfgimpl_parent is not None:
391             self = self._cfgimpl_parent
392         return self
393     
394     def _cfgimpl_get_path(self):
395         subpath = []
396         obj = self
397         while obj._cfgimpl_parent is not None:
398             subpath.insert(0, obj._cfgimpl_descr._name)
399             obj = obj._cfgimpl_parent
400         return ".".join(subpath)
401
402
403     def cfgimpl_previous_value(self, path):
404         home, name = self._cfgimpl_get_home_by_path(path)
405         return home._cfgimpl_previous_values[name]
406     
407     def get_previous_value(self, name):
408         return self._cfgimpl_previous_values[name]
409              
410     def add_warning(self, warning):
411         self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
412
413     def get_warnings(self):
414         return self._cfgimpl_get_toplevel()._cfgimpl_warnings
415     # ____________________________________________________________
416     # freeze and read-write statuses
417     def cfgimpl_freeze(self):
418         rootconfig = self._cfgimpl_get_toplevel()
419         rootconfig._cfgimpl_frozen = True
420         self._cfgimpl_frozen = True
421
422     def cfgimpl_unfreeze(self):
423         rootconfig = self._cfgimpl_get_toplevel()
424         rootconfig._cfgimpl_frozen = False
425         self._cfgimpl_frozen = False
426
427     def is_frozen(self):
428         # it should be the same value as self._cfgimpl_frozen...
429         rootconfig = self._cfgimpl_get_toplevel()
430         return rootconfig._cfgimpl_frozen
431
432     def is_mandatory(self):
433         rootconfig = self._cfgimpl_get_toplevel()
434         return rootconfig._cfgimpl_mandatory
435
436     def cfgimpl_read_only(self):
437         # hung up on freeze, hidden and disabled concepts 
438         self.cfgimpl_freeze()
439         rootconfig = self._cfgimpl_get_toplevel()
440         rootconfig.cfgimpl_disable_property('hidden')
441         rootconfig.cfgimpl_enable_property('disabled')
442         rootconfig._cfgimpl_mandatory = True
443
444     def cfgimpl_read_write(self):
445         # hung up on freeze, hidden and disabled concepts
446         self.cfgimpl_unfreeze()
447         rootconfig = self._cfgimpl_get_toplevel()
448         rootconfig.cfgimpl_enable_property('hidden')
449         rootconfig.cfgimpl_disable_property('disabled')
450         rootconfig._cfgimpl_mandatory = False
451     # ____________________________________________________________
452     def getkey(self):
453         return self._cfgimpl_descr.getkey(self)
454
455     def __hash__(self):
456         return hash(self.getkey())
457
458     def __eq__(self, other):
459         return self.getkey() == other.getkey()
460
461     def __ne__(self, other):
462         return not self == other
463
464     def __iter__(self):
465         # iteration only on Options (not OptionDescriptions)
466         for child in self._cfgimpl_descr._children:
467             if isinstance(child, Option):
468                 try:
469                     yield child._name, getattr(self, child._name)
470                 except:
471                     pass # option with properties
472
473     def iter_groups(self, group_type=None):
474         "iteration on OptionDescriptions"
475         if group_type == None:
476             groups = group_types
477         else:
478             if group_type not in group_types:
479                 raise TypeError("Unknown group_type: {0}".format(group_type))
480             groups = [group_type]
481         for child in self._cfgimpl_descr._children:
482             if isinstance(child, OptionDescription):
483                     try:
484                         if child.get_group_type() in groups: 
485                             yield child._name, getattr(self, child._name)
486                     except:
487                         pass # hidden, disabled option
488                     
489     def __str__(self, indent=""):
490         lines = []
491         children = [(child._name, child)
492                     for child in self._cfgimpl_descr._children]
493         children.sort()
494         for name, child in children:
495             if self._cfgimpl_value_owners.get(name, None) == 'default':
496                 continue
497             value = getattr(self, name)
498             if isinstance(value, Config):
499                 substr = value.__str__(indent + "    ")
500             else:
501                 substr = "%s    %s = %s" % (indent, name, value)
502             if substr:
503                 lines.append(substr)
504         if indent and not lines:
505             return ''   # hide subgroups with all default values
506         lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,))
507         return '\n'.join(lines)
508
509     def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
510         """returns a list of all paths in self, recursively, taking care of 
511         the context of properties (hidden/disabled)
512         """
513         paths = []
514         for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
515             try: 
516                 value = getattr(self, path)
517                 
518             except MandatoryError:
519                 if mandatory or allpaths:
520                     paths.append(path)
521             except PropertiesOptionError:
522                 if allpaths:
523                     paths.append(path) # hidden or disabled or mandatory option added
524             else:
525                  paths.append(path)
526         return paths 
527         
528 def make_dict(config, flatten=False):
529     paths = config.getpaths()
530     pathsvalues = []
531     for path in paths:
532         if flatten:
533             pathname = path.split('.')[-1]
534         else:
535             pathname = path
536         try:
537             value = getattr(config, path)            
538             pathsvalues.append((pathname, value))      
539         except:
540             pass # this just a hidden or disabled option
541     options = dict(pathsvalues)
542     return options
543
544 def mandatory_warnings(config):
545     mandatory = config._cfgimpl_get_toplevel()._cfgimpl_mandatory
546     config._cfgimpl_get_toplevel()._cfgimpl_mandatory = True
547     for path in config._cfgimpl_descr.getpaths(include_groups=True):
548         try:
549             value = getattr(config, path)
550         except MandatoryError:
551             yield path
552         except PropertiesOptionError:
553             pass
554     config._cfgimpl_get_toplevel()._cfgimpl_mandatory = mandatory