554f1f9acf212ae55a910a162216d8c57059ab17
[tiramisu.git] / tiramisu / option / option.py
1 # -*- coding: utf-8 -*-
2 "option types and option description"
3 # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as published by the
7 # Free Software Foundation, either version 3 of the License, or (at your
8 # option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # The original `Config` design model is unproudly borrowed from
19 # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
20 # the whole pypy projet is under MIT licence
21 # ____________________________________________________________
22 import re
23 import sys
24 from IPy import IP
25 from types import FunctionType
26 from tiramisu.setting import log, undefined
27
28 from tiramisu.error import ConfigError, ContextError
29 from tiramisu.i18n import _
30 from .baseoption import Option, validate_callback
31 from tiramisu.autolib import carry_out_calculation
32
33
34 class ChoiceOption(Option):
35     """represents a choice out of several objects.
36
37     The option can also have the value ``None``
38     """
39     __slots__ = tuple()
40
41     def __init__(self, name, doc, values, default=None,
42                  values_params=None, default_multi=None, requires=None,
43                  multi=False, callback=None, callback_params=None,
44                  validator=None, validator_params=None,
45                  properties=None, warnings_only=False):
46         """
47         :param values: is a list of values the option can possibly take
48         """
49         if isinstance(values, FunctionType):
50             validate_callback(values, values_params, 'values')
51         else:
52             if values_params is not None:
53                 raise ValueError(_('values is not a function, so values_params must be None'))
54             if not isinstance(values, tuple):  # pragma: optional cover
55                 raise TypeError(_('values must be a tuple or a function for {0}'
56                                   ).format(name))
57         self._choice_values = values
58         self._choice_values_params = values_params
59         super(ChoiceOption, self).__init__(name, doc, default=default,
60                                            default_multi=default_multi,
61                                            callback=callback,
62                                            callback_params=callback_params,
63                                            requires=requires,
64                                            multi=multi,
65                                            validator=validator,
66                                            validator_params=validator_params,
67                                            properties=properties,
68                                            warnings_only=warnings_only)
69
70     def impl_get_values(self, context):
71         #FIXME cache? but in context...
72         values = self._choice_values
73         if isinstance(values, FunctionType):
74             values_params = self._choice_values_params
75             if values_params is None:
76                 values_params = {}
77             values = carry_out_calculation(self, context=context,
78                                            callback=values,
79                                            callback_params=values_params)
80             if not isinstance(values, list):  # pragma: optional cover
81                 raise ConfigError(_('calculated values for {0} is not a list'
82                                     '').format(self.impl_getname()))
83         return values
84
85     def _validate(self, value, context=undefined):
86         try:
87             values = self.impl_get_values(context)
88             if not value in values:  # pragma: optional cover
89                 raise ValueError(_('value {0} is not permitted, '
90                                  'only {1} is allowed'
91                                  '').format(value,
92                                             values))
93         except ContextError:  # pragma: optional cover
94             log.debug('ChoiceOption validation, disabled because no context')
95
96
97 class BoolOption(Option):
98     "represents a choice between ``True`` and ``False``"
99     __slots__ = tuple()
100
101     def _validate(self, value, context=undefined):
102         if not isinstance(value, bool):
103             raise ValueError(_('invalid boolean'))  # pragma: optional cover
104
105
106 class IntOption(Option):
107     "represents a choice of an integer"
108     __slots__ = tuple()
109
110     def _validate(self, value, context=undefined):
111         if not isinstance(value, int):
112             raise ValueError(_('invalid integer'))  # pragma: optional cover
113
114
115 class FloatOption(Option):
116     "represents a choice of a floating point number"
117     __slots__ = tuple()
118
119     def _validate(self, value, context=undefined):
120         if not isinstance(value, float):
121             raise ValueError(_('invalid float'))  # pragma: optional cover
122
123
124 class StrOption(Option):
125     "represents the choice of a string"
126     __slots__ = tuple()
127
128     def _validate(self, value, context=undefined):
129         if not isinstance(value, str):
130             raise ValueError(_('invalid string'))  # pragma: optional cover
131
132
133 if sys.version_info[0] >= 3:  # pragma: optional cover
134     #UnicodeOption is same as StrOption in python 3+
135     class UnicodeOption(StrOption):
136         __slots__ = tuple()
137         pass
138 else:
139     class UnicodeOption(Option):
140         "represents the choice of a unicode string"
141         __slots__ = tuple()
142         _empty = u''
143
144         def _validate(self, value, context=undefined):
145             if not isinstance(value, unicode):
146                 raise ValueError(_('invalid unicode'))  # pragma: optional cover
147
148
149 class IPOption(Option):
150     "represents the choice of an ip"
151     __slots__ = tuple()
152
153     def __init__(self, name, doc, default=None, default_multi=None,
154                  requires=None, multi=False, callback=None,
155                  callback_params=None, validator=None, validator_params=None,
156                  properties=None, private_only=False, allow_reserved=False,
157                  warnings_only=False):
158         extra = {'_private_only': private_only,
159                  '_allow_reserved': allow_reserved}
160         super(IPOption, self).__init__(name, doc, default=default,
161                                        default_multi=default_multi,
162                                        callback=callback,
163                                        callback_params=callback_params,
164                                        requires=requires,
165                                        multi=multi,
166                                        validator=validator,
167                                        validator_params=validator_params,
168                                        properties=properties,
169                                        warnings_only=warnings_only,
170                                        extra=extra)
171
172     def _validate(self, value, context=undefined):
173         # sometimes an ip term starts with a zero
174         # but this does not fit in some case, for example bind does not like it
175         self._impl_valid_unicode(value)
176         try:
177             for val in value.split('.'):
178                 if val.startswith("0") and len(val) > 1:
179                     raise ValueError(_('invalid IP'))  # pragma: optional cover
180         except AttributeError:  # pragma: optional cover
181             #if integer for example
182             raise ValueError(_('invalid IP'))
183         # 'standard' validation
184         try:
185             IP('{0}/32'.format(value))
186         except ValueError:  # pragma: optional cover
187             raise ValueError(_('invalid IP'))
188
189     def _second_level_validation(self, value, warnings_only):
190         ip = IP('{0}/32'.format(value))
191         if not self._get_extra('_allow_reserved') and ip.iptype() == 'RESERVED':  # pragma: optional cover
192             if warnings_only:
193                 msg = _("IP is in reserved class")
194             else:
195                 msg = _("invalid IP, mustn't be in reserved class")
196             raise ValueError(msg)
197         if self._get_extra('_private_only') and not ip.iptype() == 'PRIVATE':  # pragma: optional cover
198             if warnings_only:
199                 msg = _("IP is not in private class")
200             else:
201                 msg = _("invalid IP, must be in private class")
202             raise ValueError(msg)
203
204     def _cons_in_network(self, opts, vals, warnings_only):
205         if len(vals) != 3:
206             raise ConfigError(_('invalid len for vals'))  # pragma: optional cover
207         if None in vals:
208             return
209         ip, network, netmask = vals
210         if IP(ip) not in IP('{0}/{1}'.format(network, netmask)):  # pragma: optional cover
211             if warnings_only:
212                 msg = _('IP {0} ({1}) not in network {2} ({3}) with netmask {4}'
213                         ' ({5})')
214             else:
215                 msg = _('invalid IP {0} ({1}) not in network {2} ({3}) with '
216                         'netmask {4} ({5})')
217             raise ValueError(msg.format(ip, opts[0].impl_getname(), network,
218                              opts[1].impl_getname(), netmask, opts[2].impl_getname()))
219         # test if ip is not network/broadcast IP
220         opts[2]._cons_ip_netmask((opts[2], opts[0]), (netmask, ip), warnings_only)
221
222
223 class PortOption(Option):
224     """represents the choice of a port
225     The port numbers are divided into three ranges:
226     the well-known ports,
227     the registered ports,
228     and the dynamic or private ports.
229     You can actived this three range.
230     Port number 0 is reserved and can't be used.
231     see: http://en.wikipedia.org/wiki/Port_numbers
232     """
233     __slots__ = tuple()
234
235     def __init__(self, name, doc, default=None, default_multi=None,
236                  requires=None, multi=False, callback=None,
237                  callback_params=None, validator=None, validator_params=None,
238                  properties=None, allow_range=False, allow_zero=False,
239                  allow_wellknown=True, allow_registred=True,
240                  allow_private=False, warnings_only=False):
241         extra = {'_allow_range': allow_range,
242                  '_min_value': None,
243                  '_max_value': None}
244         ports_min = [0, 1, 1024, 49152]
245         ports_max = [0, 1023, 49151, 65535]
246         is_finally = False
247         for index, allowed in enumerate([allow_zero,
248                                          allow_wellknown,
249                                          allow_registred,
250                                          allow_private]):
251             if extra['_min_value'] is None:
252                 if allowed:
253                     extra['_min_value'] = ports_min[index]
254             elif not allowed:
255                 is_finally = True
256             elif allowed and is_finally:
257                 raise ValueError(_('inconsistency in allowed range'))  # pragma: optional cover
258             if allowed:
259                 extra['_max_value'] = ports_max[index]
260
261         if extra['_max_value'] is None:
262             raise ValueError(_('max value is empty'))  # pragma: optional cover
263
264         super(PortOption, self).__init__(name, doc, default=default,
265                                          default_multi=default_multi,
266                                          callback=callback,
267                                          callback_params=callback_params,
268                                          requires=requires,
269                                          multi=multi,
270                                          validator=validator,
271                                          validator_params=validator_params,
272                                          properties=properties,
273                                          warnings_only=warnings_only,
274                                          extra=extra)
275
276     def _validate(self, value, context=undefined):
277         if isinstance(value, int):
278             value = unicode(value)
279         self._impl_valid_unicode(value)
280         if self._get_extra('_allow_range') and ":" in str(value):  # pragma: optional cover
281             value = str(value).split(':')
282             if len(value) != 2:
283                 raise ValueError(_('invalid port, range must have two values '
284                                  'only'))
285             if not value[0] < value[1]:
286                 raise ValueError(_('invalid port, first port in range must be'
287                                  ' smaller than the second one'))
288         else:
289             value = [value]
290
291         for val in value:
292             try:
293                 val = int(val)
294             except ValueError:  # pragma: optional cover
295                 raise ValueError(_('invalid port'))
296             if not self._get_extra('_min_value') <= val <= self._get_extra('_max_value'):  # pragma: optional cover
297                 raise ValueError(_('invalid port, must be an integer between {0} '
298                                    'and {1}').format(self._get_extra('_min_value'),
299                                                      self._get_extra('_max_value')))
300
301
302 class NetworkOption(Option):
303     "represents the choice of a network"
304     __slots__ = tuple()
305
306     def _validate(self, value, context=undefined):
307         self._impl_valid_unicode(value)
308         try:
309             IP(value)
310         except ValueError:  # pragma: optional cover
311             raise ValueError(_('invalid network address'))
312
313     def _second_level_validation(self, value, warnings_only):
314         ip = IP(value)
315         if ip.iptype() == 'RESERVED':  # pragma: optional cover
316             if warnings_only:
317                 msg = _("network address is in reserved class")
318             else:
319                 msg = _("invalid network address, mustn't be in reserved class")
320             raise ValueError(msg)
321
322
323 class NetmaskOption(Option):
324     "represents the choice of a netmask"
325     __slots__ = tuple()
326
327     def _validate(self, value, context=undefined):
328         self._impl_valid_unicode(value)
329         try:
330             IP('0.0.0.0/{0}'.format(value))
331         except ValueError:  # pragma: optional cover
332             raise ValueError(_('invalid netmask address'))
333
334     def _cons_network_netmask(self, opts, vals, warnings_only):
335         #opts must be (netmask, network) options
336         if None in vals:
337             return
338         self.__cons_netmask(opts, vals[0], vals[1], False, warnings_only)
339
340     def _cons_ip_netmask(self, opts, vals, warnings_only):
341         #opts must be (netmask, ip) options
342         if None in vals:
343             return
344         self.__cons_netmask(opts, vals[0], vals[1], True, warnings_only)
345
346     def __cons_netmask(self, opts, val_netmask, val_ipnetwork, make_net,
347                        warnings_only):
348         if len(opts) != 2:
349             raise ConfigError(_('invalid len for opts'))  # pragma: optional cover
350         msg = None
351         try:
352             ip = IP('{0}/{1}'.format(val_ipnetwork, val_netmask),
353                     make_net=make_net)
354             #if cidr == 32, ip same has network
355             if make_net and ip.prefixlen() != 32:
356                 val_ip = IP(val_ipnetwork)
357                 if ip.net() == val_ip:
358                     msg = _("invalid IP {0} ({1}) with netmask {2},"
359                             " this IP is a network")
360                 if ip.broadcast() == val_ip:
361                     msg = _("invalid IP {0} ({1}) with netmask {2},"
362                             " this IP is a broadcast")
363
364         except ValueError:  # pragma: optional cover
365             if not make_net:
366                 msg = _('invalid network {0} ({1}) with netmask {2}')
367         if msg is not None:  # pragma: optional cover
368             raise ValueError(msg.format(val_ipnetwork, opts[1].impl_getname(),
369                                         val_netmask))
370
371
372 class BroadcastOption(Option):
373     __slots__ = tuple()
374
375     def _validate(self, value, context=undefined):
376         self._impl_valid_unicode(value)
377         try:
378             IP('{0}/32'.format(value))
379         except ValueError:  # pragma: optional cover
380             raise ValueError(_('invalid broadcast address'))
381
382     def _cons_broadcast(self, opts, vals, warnings_only):
383         if len(vals) != 3:
384             raise ConfigError(_('invalid len for vals'))  # pragma: optional cover
385         if None in vals:
386             return
387         broadcast, network, netmask = vals
388         if IP('{0}/{1}'.format(network, netmask)).broadcast() != IP(broadcast):
389             raise ValueError(_('invalid broadcast {0} ({1}) with network {2} '
390                                '({3}) and netmask {4} ({5})').format(
391                                    broadcast, opts[0].impl_getname(), network,
392                                    opts[1].impl_getname(), netmask, opts[2].impl_getname()))  # pragma: optional cover
393
394
395 class DomainnameOption(Option):
396     """represents the choice of a domain name
397     netbios: for MS domain
398     hostname: to identify the device
399     domainname:
400     fqdn: with tld, not supported yet
401     """
402     __slots__ = tuple()
403
404     def __init__(self, name, doc, default=None, default_multi=None,
405                  requires=None, multi=False, callback=None,
406                  callback_params=None, validator=None, validator_params=None,
407                  properties=None, allow_ip=False, type_='domainname',
408                  warnings_only=False, allow_without_dot=False):
409         if type_ not in ['netbios', 'hostname', 'domainname']:
410             raise ValueError(_('unknown type_ {0} for hostname').format(type_))  # pragma: optional cover
411         extra = {'_dom_type': type_}
412         if allow_ip not in [True, False]:
413             raise ValueError(_('allow_ip must be a boolean'))  # pragma: optional cover
414         if allow_without_dot not in [True, False]:
415             raise ValueError(_('allow_without_dot must be a boolean'))  # pragma: optional cover
416         extra['_allow_ip'] = allow_ip
417         extra['_allow_without_dot'] = allow_without_dot
418         extra['_domain_re'] = re.compile(r'^[a-z\d][a-z\d\-]*$')
419         extra['_has_upper'] = re.compile('[A-Z]')
420
421         super(DomainnameOption, self).__init__(name, doc, default=default,
422                                                default_multi=default_multi,
423                                                callback=callback,
424                                                callback_params=callback_params,
425                                                requires=requires,
426                                                multi=multi,
427                                                validator=validator,
428                                                validator_params=validator_params,
429                                                properties=properties,
430                                                warnings_only=warnings_only,
431                                                extra=extra)
432
433     def _validate(self, value, context=undefined):
434         self._impl_valid_unicode(value)
435         def _valid_length(val):
436             if len(val) < 2:
437                 raise ValueError(_("invalid domainname's length (min 2)"))
438             if len(val) > part_name_length:
439                 raise ValueError(_("invalid domainname's length (max {0})"
440                                    "").format(part_name_length))
441
442         if self._get_extra('_allow_ip') is True:  # pragma: optional cover
443             try:
444                 IP('{0}/32'.format(value))
445                 return
446             except ValueError:
447                 pass
448         if self._get_extra('_dom_type') == 'netbios':
449             part_name_length = 15
450         else:
451             part_name_length = 63
452         if self._get_extra('_dom_type') == 'domainname':
453             if not self._get_extra('_allow_without_dot') and not "." in value:
454                 raise ValueError(_("invalid domainname, must have dot"))
455             if len(value) > 255:
456                 raise ValueError(_("invalid domainname's length (max 255)"))
457             for dom in value.split('.'):
458                 _valid_length(dom)
459         else:
460             _valid_length(value)
461
462     def _second_level_validation(self, value, warnings_only):
463         def _valid_char(val):
464             if self._get_extra('_has_upper').search(val):
465                 raise ValueError(_('some characters are uppercase'))
466             if not self._get_extra('_domain_re').search(val):
467                 if warnings_only:
468                     raise ValueError(_('some characters may cause problems'))
469                 else:
470                     raise ValueError(_('invalid domainname'))
471         #not for IP
472         if self._get_extra('_allow_ip') is True:
473             try:
474                 IP('{0}/32'.format(value))
475                 return
476             except ValueError:
477                 pass
478         if self._get_extra('_dom_type') == 'domainname':
479             for dom in value.split('.'):
480                 _valid_char(dom)
481         else:
482             _valid_char(value)
483
484
485 class EmailOption(DomainnameOption):
486     __slots__ = tuple()
487     username_re = re.compile(r"^[\w!#$%&'*+\-/=?^`{|}~.]+$")
488
489     def _validate(self, value, context=undefined):
490         self._impl_valid_unicode(value)
491         splitted = value.split('@', 1)
492         try:
493             username, domain = splitted
494         except ValueError:  # pragma: optional cover
495             raise ValueError(_('invalid email address, must contains one @'
496                                ))
497         if not self.username_re.search(username):
498             raise ValueError(_('invalid username in email address'))  # pragma: optional cover
499         super(EmailOption, self)._validate(domain)
500         super(EmailOption, self)._second_level_validation(domain, False)
501
502     def _second_level_validation(self, value, warnings_only):
503         pass
504
505
506 class URLOption(DomainnameOption):
507     __slots__ = tuple()
508     proto_re = re.compile(r'(http|https)://')
509     path_re = re.compile(r"^[A-Za-z0-9\-\._~:/\?#\[\]@!%\$&\'\(\)\*\+,;=]+$")
510
511     def _validate(self, value, context=undefined):
512         self._impl_valid_unicode(value)
513         match = self.proto_re.search(value)
514         if not match:  # pragma: optional cover
515             raise ValueError(_('invalid url, must start with http:// or '
516                                'https://'))
517         value = value[len(match.group(0)):]
518         # get domain/files
519         splitted = value.split('/', 1)
520         try:
521             domain, files = splitted
522         except ValueError:
523             domain = value
524             files = None
525         # if port in domain
526         splitted = domain.split(':', 1)
527         try:
528             domain, port = splitted
529
530         except ValueError:  # pragma: optional cover
531             domain = splitted[0]
532             port = 0
533         if not 0 <= int(port) <= 65535:
534             raise ValueError(_('invalid url, port must be an between 0 and '
535                                '65536'))  # pragma: optional cover
536         # validate domainname
537         super(URLOption, self)._validate(domain)
538         super(URLOption, self)._second_level_validation(domain, False)
539         # validate file
540         if files is not None and files != '' and not self.path_re.search(files):
541             raise ValueError(_('invalid url, must ends with a valid resource name'))  # pragma: optional cover
542
543     def _second_level_validation(self, value, warnings_only):
544         pass
545
546
547 class UsernameOption(Option):
548     __slots__ = tuple()
549     #regexp build with 'man 8 adduser' informations
550     username_re = re.compile(r"^[a-z_][a-z0-9_-]{0,30}[$a-z0-9_-]{0,1}$")
551
552     def _validate(self, value, context=undefined):
553         self._impl_valid_unicode(value)
554         match = self.username_re.search(value)
555         if not match:
556             raise ValueError(_('invalid username'))  # pragma: optional cover
557
558
559 class FilenameOption(Option):
560     __slots__ = tuple()
561     path_re = re.compile(r"^[a-zA-Z0-9\-\._~/+]+$")
562
563     def _validate(self, value, context=undefined):
564         self._impl_valid_unicode(value)
565         match = self.path_re.search(value)
566         if not match:
567             raise ValueError(_('invalid filename'))  # pragma: optional cover