from tiramisu.config import Config
from tiramisu.option import ChoiceOption, BoolOption, IntOption, \
StrOption, OptionDescription
-from test.test_state import _diff_opts, _diff_conf
+from .test_state import _diff_opts, _diff_conf
from py.test import raises
try:
delete_session('config', '29090938')
- except ConfigError:
+ except ValueError:
pass
from tiramisu.option import IntOption, StrOption, UnicodeOption, OptionDescription, SymLinkOption
from tiramisu.error import PropertiesOptionError, ConfigError
from tiramisu.setting import groups
+from tiramisu.storage import delete_session
def make_description():
config.str = 'a'
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str1', 'unicode2', 'str3']
assert list(config.cfgimpl_get_values().mandatory_warnings(force_permissive=True)) == ['str1', 'unicode2', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_disabled():
setting[descr.str].append('disabled')
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str1', 'unicode2', 'str3']
assert list(config.cfgimpl_get_values().mandatory_warnings(force_permissive=True)) == ['str1', 'unicode2', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_hidden():
setting[descr.str].append('hidden')
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str1', 'unicode2', 'str3']
assert list(config.cfgimpl_get_values().mandatory_warnings(force_permissive=True)) == ['str', 'str1', 'unicode2', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_frozen():
setting[descr.str].append('frozen')
config.read_only()
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str', 'str1', 'unicode2', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_master():
config.read_only()
raises(PropertiesOptionError, 'config.ip_admin_eth0.ip_admin_eth0')
raises(PropertiesOptionError, 'config.ip_admin_eth0.netmask_admin_eth0')
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_master_empty():
config.read_only()
assert config.ip_admin_eth0.ip_admin_eth0 == ['ip']
assert config.ip_admin_eth0.netmask_admin_eth0 == [None]
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_slave():
config.read_only()
assert config.ip_admin_eth0.ip_admin_eth0 == ['ip']
assert config.ip_admin_eth0.netmask_admin_eth0 == ['ip']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_symlink():
setting[descr.str].append('frozen')
config.read_only()
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str', 'str1', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_validate():
config.str = 'test'
raises(ValueError, "list(config.cfgimpl_get_values().mandatory_warnings())")
assert list(config.cfgimpl_get_values().mandatory_warnings(validate=False)) == ['str1', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_validate_empty():
config.read_only()
raises(ConfigError, "list(config.cfgimpl_get_values().mandatory_warnings())")
assert list(config.cfgimpl_get_values().mandatory_warnings(validate=False)) == ['str', 'str1', 'str3', 'unicode1']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_warnings_requires():
config.read_write()
config.str = 'yes'
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['str1', 'unicode2', 'str3']
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
def test_mandatory_od_disabled():
assert list(config.cfgimpl_get_values().mandatory_warnings()) == ['tiram.str1', 'tiram.unicode2', 'tiram.str3']
config.cfgimpl_get_settings()[descr].append('disabled')
assert list(config.cfgimpl_get_values().mandatory_warnings()) == []
+ try:
+ delete_session('config', config.impl_getsessionid())
+ except ValueError:
+ pass
+ del(config)
_diff_conf(cfg, q)
try:
delete_session('config', '29090931')
- except ConfigError:
+ except ValueError:
pass
_diff_conf(cfg, q)
try:
delete_session('config', '29090939')
- except ConfigError:
+ except ValueError:
pass
_diff_conf(cfg, q)
try:
delete_session('config', '29090932')
- except ConfigError:
+ except ValueError:
pass
assert q.val1 is False
try:
delete_session('config', '29090933')
- except ConfigError:
+ except ValueError:
pass
assert q.getowner(nval1) == owners.newowner
try:
delete_session('config', '29090934')
- except ConfigError:
+ except ValueError:
pass
delete_session('config', '29090935')
delete_session('config', '29090936')
delete_session('config', '29090937')
- except ConfigError:
+ except ValueError:
pass
delete_session('config', '29090935')
delete_session('config', '29090936')
delete_session('config', '29090937')
- except ConfigError:
+ except ValueError:
pass
pass
else:
assert 'test_persistent' in list_sessions('config')
- delete_session('test_persistent')
+ delete_session('config', 'test_persistent')
assert 'test_persistent' not in list_sessions('config')
o = OptionDescription('od', '', [b])
try:
c = Config(o, session_id='test_persistent', persistent=True)
- c.cfgimpl_get_settings().remove('cache')
except ValueError:
# storage is not persistent
pass
assert c.b is True
del(c)
c = Config(o, session_id='test_persistent', persistent=True)
- c.cfgimpl_get_settings().remove('cache')
assert c.b is True
assert 'test_persistent' in list_sessions('config')
- delete_session('test_persistent')
+ delete_session('config', c.impl_getsessionid())
+ del(c)
c = Config(o, session_id='test_persistent', persistent=True)
- c.cfgimpl_get_settings().remove('cache')
assert c.b is None
- delete_session('test_persistent')
+ delete_session('config', c.impl_getsessionid())
+ del(c)
def test_two_persistent():
o = OptionDescription('od', '', [b])
try:
c = Config(o, session_id='test_persistent', persistent=True)
- c.cfgimpl_get_settings().remove('cache')
except ValueError:
# storage is not persistent
pass
else:
+ c.cfgimpl_get_settings().remove('cache')
c2 = Config(o, session_id='test_persistent', persistent=True)
c2.cfgimpl_get_settings().remove('cache')
assert c.b is None
c.b = False
assert c.b is False
assert c2.b is False
- c.b = True
+ c2.b = True
assert c.b is True
assert c2.b is True
- delete_session('test_persistent')
+ delete_session('config', 'test_persistent')
+
+
+def test_create_persistent_retrieve_owner():
+ b = BoolOption('b', '')
+ o = OptionDescription('od', '', [b])
+ try:
+ c = Config(o, session_id='test_persistent', persistent=True)
+ except ValueError:
+ # storage is not persistent
+ pass
+ else:
+ assert c.getowner(b) == owners.default
+ c.b = True
+ assert c.b is True
+ assert c.getowner(b) == owners.user
+ owners.addowner('persistentowner')
+ c.cfgimpl_get_values().setowner(b, owners.persistentowner)
+ assert c.getowner(b) == owners.persistentowner
+ del(c)
+ #
+ c = Config(o, session_id='test_persistent', persistent=True)
+ c.cfgimpl_get_values().setowner(b, owners.persistentowner)
+ delete_session('config', c.impl_getsessionid())
+ del(c)
+ #
+ c = Config(o, session_id='test_persistent', persistent=True)
+ assert c.b is None
+ assert c.getowner(b) == owners.default
+ delete_session('config', c.impl_getsessionid())
+ del(c)
def test_two_persistent_owner():
else:
c2 = Config(o, session_id='test_persistent', persistent=True)
c2.cfgimpl_get_settings().remove('cache')
- owners.addowner('persistent')
assert c.getowner(b) == owners.default
assert c2.getowner(b) == owners.default
c.b = False
assert c.getowner(b) == owners.user
assert c2.getowner(b) == owners.user
+ owners.addowner('persistent')
c.cfgimpl_get_values().setowner(b, owners.persistent)
assert c.getowner(b) == owners.persistent
assert c2.getowner(b) == owners.persistent
- delete_session('test_persistent')
+ delete_session('config', 'test_persistent')
+
+
+def test_create_persistent_retrieve_information():
+ b = BoolOption('b', '')
+ o = OptionDescription('od', '', [b])
+ try:
+ c = Config(o, session_id='test_persistent', persistent=True)
+ except ValueError:
+ # storage is not persistent
+ pass
+ else:
+ c.impl_set_information('info', 'string')
+ assert c.impl_get_information('info') == 'string'
+ del(c)
+ #
+ c = Config(o, session_id='test_persistent', persistent=True)
+ assert c.impl_get_information('info') == 'string'
+ delete_session('config', c.impl_getsessionid())
+ del(c)
+ #
+ c = Config(o, session_id='test_persistent', persistent=True)
+ assert c.impl_get_information('info', None) == None
+ delete_session('config', c.impl_getsessionid())
+ del(c)
def test_two_persistent_information():
c2.cfgimpl_get_settings().remove('cache')
c2.cfgimpl_get_settings().remove('cache')
assert c2.impl_get_information('info') == 'string'
- delete_session('test_persistent')
+ delete_session('config', 'test_persistent')
self._impl_settings._impl_setstate(storage)
self._impl_meta = None
- def _gen_fake_values(self):
+ def _gen_fake_values(self, session):
fake_config = Config(self._impl_descr, persistent=False,
force_values=get_storages_validation(),
force_settings=self.cfgimpl_get_settings())
- fake_config.cfgimpl_get_values()._p_._values = self.cfgimpl_get_values()._p_._values
+ fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(session, fake=True))
return fake_config
def duplicate(self):
def impl_getname(self):
return self._impl_name
+ def impl_getsessionid(self):
+ return self._impl_values._p_._storage.session_id
+
class GroupConfig(_CommonConfig):
__slots__ = ('__weakref__', '_impl_children', '_impl_name')
pass
def getitem(self, values, opt, path, validate, force_permissive,
- trusted_cached_properties, validate_properties, slave_path=undefined,
- slave_value=undefined, setting_properties=undefined,
- self_properties=undefined, index=None,
+ trusted_cached_properties, validate_properties, session,
+ slave_path=undefined, slave_value=undefined,
+ setting_properties=undefined, self_properties=undefined, index=None,
returns_raise=False):
if self.is_master(opt):
return self._getmaster(values, opt, path, validate,
force_permissive,
validate_properties, slave_path,
slave_value, self_properties, index,
- returns_raise, setting_properties)
+ returns_raise, setting_properties, session)
else:
return self._getslave(values, opt, path, validate,
force_permissive, trusted_cached_properties,
validate_properties, setting_properties,
- self_properties, index, returns_raise)
+ self_properties, index, returns_raise,
+ session)
def _getmaster(self, values, opt, path, validate, force_permissive,
validate_properties, c_slave_path,
c_slave_value, self_properties, index, returns_raise,
- setting_properties):
+ setting_properties, session):
value = values._get_cached_value(opt, path=path, validate=validate,
force_permissive=force_permissive,
validate_properties=validate_properties,
masterlen = len(value)
for slave in self.getslaves(opt):
slave_path = slave.impl_getpath(values._getcontext())
- slavelen = values._p_.get_max_length(slave_path)
+ slavelen = values._p_.get_max_length(slave_path, session)
self.validate_slave_length(masterlen, slavelen, slave.impl_getname(), opt)
return value
def _getslave(self, values, opt, path, validate, force_permissive,
trusted_cached_properties, validate_properties, setting_properties,
- self_properties, index, returns_raise):
+ self_properties, index, returns_raise, session):
"""
if master has length 0:
return []
master = self.getmaster(opt)
context = values._getcontext()
masterp = master.impl_getpath(context)
- masterlen = self.get_length(values, opt, validate, undefined,
+ masterlen = self.get_length(values, opt, session, validate, undefined,
undefined, force_permissive,
master=master, returns_raise=returns_raise)
if isinstance(masterlen, Exception):
return masterlen
- master_is_meta = values._is_meta(master, masterp)
+ master_is_meta = values._is_meta(master, masterp, session)
multi = values._get_multi(opt, path)
#if masterlen is [], test properties (has no value, don't get any value)
if masterlen == 0:
raise err
return multi
- def validate(self, values, opt, value, path, returns_raise):
+ def validate(self, values, opt, value, path, returns_raise, session):
if self.is_master(opt):
masterlen = len(value)
#for regen slave path
base_path = '.'.join(path.split('.')[:-1]) + '.'
for slave in self.getslaves(opt):
slave_path = base_path + slave.impl_getname()
- slavelen = values._p_.get_max_length(slave_path)
+ slavelen = values._p_.get_max_length(slave_path, session)
self.validate_slave_length(masterlen, slavelen, slave.impl_getname(), opt)
else:
- val_len = self.get_length(values, opt, slave_path=path, returns_raise=returns_raise)
+ val_len = self.get_length(values, opt, session, slave_path=path, returns_raise=returns_raise)
if isinstance(val_len, Exception):
return val_len
self.validate_slave_length(val_len,
len(value),
opt.impl_getname(), opt, setitem=True)
- def get_length(self, values, opt, validate=True, slave_path=undefined,
+ def get_length(self, values, opt, session, validate=True, slave_path=undefined,
slave_value=undefined, force_permissive=False, master=None,
masterp=None, returns_raise=False):
"""get master len with slave option"""
if slave_value is undefined:
slave_path = undefined
value = self.getitem(values, master, masterp, validate,
- force_permissive, None, True, slave_path,
- slave_value, returns_raise=returns_raise)
+ force_permissive, None, True, session, slave_path=slave_path,
+ slave_value=slave_value, returns_raise=returns_raise)
if isinstance(value, Exception):
return value
return len(value)
'must not be a multi for {1}').format(
require_opt.impl_getname(), option.impl_getname()))
if init:
+ session = config._impl_values._p_.getsession()
if len(cache_option) != len(set(cache_option)):
for idx in xrange(1, len(cache_option) + 1):
opt = cache_option.pop(0)
raise ConfigError(_('a dynoption ({0}) cannot have '
'force_store_value property').format(subpath))
config._impl_values._p_.setvalue(subpath, value,
- owners.forced, None)
+ owners.forced, None, session)
+ del(session)
# ____________________________________________________________
def impl_set_group_type(self, group_type):
session_id = gen_id(context)
imp = storage_type.get()
storage = imp.Storage(session_id, persistent)
- settings = imp.Settings(storage)
+ settings = imp.Settings(session_id, storage)
values = imp.Values(storage)
- try:
- return settings, values
- except Exception as err:
- raise Exception(_('unable to get storages:') + str(err))
+ return settings, values
def get_storages_option(type_):
:params session_id: id of session to delete
"""
if type_ == 'option':
- return storage_option_type.get().delete_session(session_id)
+ storage_option_type.get().delete_session(session_id)
else:
- return storage_type.get().delete_session(session_id)
+ storage_module = storage_type.get()
+ session = storage_module.storage.getsession()
+ storage_module.value.delete_session(session_id, session)
+ storage_module.storage.delete_session(session_id, session)
+ session.commit()
+ del(session)
__all__ = (set_storage, list_sessions, delete_session)
class Settings(Cache):
__slots__ = ('_properties', '_permissives')
- def __init__(self, storage):
+ def __init__(self, session_id, storage):
# properties attribute: the name of a property enables this property
# key is None for global properties
self._properties = {}
_list_sessions.remove(self.session_id)
except AttributeError: # pragma: optional cover
pass
+
+
+def getsession():
+ pass
def __init__(self, storage):
"""init plugin means create values storage
"""
+ #(('path1',), (index1,), (value1,), ('owner1'))
self._values = (tuple(), tuple(), tuple(), tuple())
self._informations = {}
# should init cache too
super(Values, self).__init__(storage)
+ def getsession(self):
+ pass
+
+ def delsession(self, session):
+ pass
+
def _setvalue_info(self, nb, idx, value, values, index, vidx):
lst = list(self._values[nb])
if idx is None:
tval[vidx] = value
lst[idx] = tuple(tval)
lst[idx] = tuple(lst[idx])
- for ls in lst:
- if isinstance(ls, list):
- raise Exception('pouet')
values.append(tuple(lst))
return vidx
# value
- def setvalue(self, path, value, owner, index):
+ def setvalue(self, path, value, owner, index, session):
"""set value for a path
a specified value must be associated to an owner
"""
self._setvalue_info(3, idx, owner, values, index, vidx)
self._values = tuple(values)
- def getvalue(self, path, index=None):
+ def getvalue(self, path, session, index=None):
"""get value for a path
return: only value, not the owner
"""
"""
return path in self._values[0]
- def resetvalue(self, path):
+ def resetvalue(self, path, session):
"""remove value means delete value in storage
"""
def _resetvalue(nb):
return values
# owner
- def setowner(self, path, owner, index=None):
+ def setowner(self, path, owner, session, index=None):
"""change owner for a path
"""
idx = self._values[0].index(path)
lst[3] = tuple(values[0])
self._values = tuple(lst)
- def get_max_length(self, path):
+ def get_max_length(self, path, session):
if path in self._values[0]:
idx = self._values[0].index(path)
else:
return 0
return max(self._values[1][idx]) + 1
- def getowner(self, path, default, index=None, only_default=False):
+ def getowner(self, path, default, session, index=None, only_default=False):
"""get owner for a path
return: owner object
"""
if index is None:
if only_default:
if path in self._values[0]:
- return None
+ return undefined
else:
return default
val = self._getvalue(path, 3, index)
- if val is None:
+ if val is undefined:
return default
return val
else:
value = self._getvalue(path, 3, index)
- if value is None:
+ if value is undefined:
return default
else:
return value
subidx = self._values[1][idx].index(index)
value = self._values[nb][idx][subidx]
else:
- value = None
+ value = undefined
else:
value = []
for i in xrange(0, max(self._values[1][idx])):
else:
value.append(undefined)
else:
- value = None
+ value = undefined
if isinstance(value, tuple):
value = list(value)
return value
raise ValueError(_("information's item"
" not found: {0}").format(key))
return value
+
+ def exportation(self, session, fake=False):
+ return self._values
+
+ def importation(self, export):
+ self._values = export
+
+def delete_session(session_id, session):
+ raise ValueError(_('a dictionary cannot be persistent'))
"""
from .value import Values
from .setting import Settings
-from .storage import Storage, list_sessions, delete_session, setting
+from .storage import Storage, list_sessions, delete_session, storage_setting
from .option import StorageBase, StorageOptionDescription
from .util import load
load()
-__all__ = (setting, Values, Settings, Storage, list_sessions, delete_session,
+__all__ = (storage_setting, Values, Settings, Storage, list_sessions, delete_session,
StorageBase, StorageOptionDescription)
# Base, OptionDescription)
class Settings(Cache, SqlAlchemyBase):
__tablename__ = 'settings'
id = Column(Integer, primary_key=True)
+ session_id = Column(String, index=True)
_props = relationship("_Property",
collection_class=attribute_mapped_collection('path'),
cascade="all, delete-orphan")
cascade="all, delete-orphan")
_permissives = association_proxy("_perms", "permissives")
- #def __init__(self, storage):
- # super(Settings, self).__init__(storage)
+ def __init__(self, session_id, storage):
+ session = self.getsession()
+ self.session_id = session_id
+ super(Settings, self).__init__(storage)
+ session.commit()
+
+ def getsession(self):
+ return util.Session()
# properties
def setproperties(self, path, properties):
+ session = self.getsession()
self._properties[path] = properties
- util.session.commit()
+ session.commit()
+ del(session)
def getproperties(self, path, default_properties):
return self._properties.get(path, set(default_properties))
return path in self._properties
def reset_all_properties(self):
+ session = self.getsession()
self._properties.clear()
- util.session.commit()
+ session.commit()
+ del(session)
def delproperties(self, path):
try:
+ session = self.getsession()
del(self._properties[path])
- util.session.commit()
+ session.commit()
+ del(session)
except KeyError:
pass
# permissive
def setpermissive(self, path, permissive):
+ session = self.getsession()
self._permissives[path] = frozenset(permissive)
- util.session.commit()
+ session.commit()
def getpermissive(self, path=None):
ret = self._permissives.get(path, frozenset())
example: {'path1': set(['perm1', 'perm2'])}
"""
return self._permissives
+
+
+def delete_session(session_id, session):
+ print session.query(_Property).all()
+ print session.query(_Permissive).all()
+ settings_id = session.query(Settings).filter_by(session_id=session_id).first().id
+ for val in session.query(_Property).filter_by(settings=settings_id).all():
+ session.delete(val)
+ for val in session.query(_Permissive).filter_by(settings=settings_id).all():
+ session.delete(val)
dir_database = '/tmp'
-setting = Setting()
+storage_setting = Setting()
class Session(SqlAlchemyBase):
def list_sessions(): # pragma: optional cover
+ session = util.Session()
ret = []
- for val in util.session.query(Session).all():
+ for val in session.query(Session).all():
ret.append(val.session)
+ del(session)
return ret
-def delete_session(session_id): # pragma: optional cover
- #Must remove all values for this session!
- util.session.delete(util.session.query(Session).filter_by(session=session_id).first())
- util.session.commit()
+def delete_session(session_id, session): # pragma: optional cover
+ session.delete(session.query(Session).filter_by(session=session_id).first())
+ session.commit()
+
+
+def getsession():
+ return util.Session()
class Storage(object):
serializable = True
def __init__(self, session_id, persistent, test=False):
- if util.session.query(Session).filter_by(session=session_id).first(): # pragma: optional cover
- raise ValueError(_('session already used'))
+ session = getsession()
self.session_id = session_id
self.persistent = persistent
- util.session.add(Session(session_id))
- util.session.commit()
+ if not session.query(Session).filter_by(session=session_id).first(): # pragma: optional cover
+ session.add(Session(session_id))
+ session.commit()
+ del(session)
def __del__(self):
- delete_session(self.session_id)
+ if not self.persistent:
+ session = getsession()
+ delete_session(self.session_id, session)
+ del(session)
engine = create_engine('sqlite:///:memory:')
SqlAlchemyBase = declarative_base()
-global session
-session = None
+global session, Session
+Session = None
def load():
- global session
- if session is None:
+ global session, Session
+ if Session is None:
#engine.echo = True
#print SqlAlchemyBase.metadata.tables.keys()
SqlAlchemyBase.metadata.create_all(engine)
- Session = sessionmaker(bind=engine)
- session = Session()
+ Session = sessionmaker(bind=engine, expire_on_commit=False)
from ..util import Cache
from .util import SqlAlchemyBase
import util
+from ...setting import undefined
from sqlalchemy import Column, Integer, String, PickleType
+from sqlalchemy import func
from tiramisu.setting import owners
class _Vinformation(SqlAlchemyBase):
__tablename__ = 'vinformation'
id = Column(Integer, primary_key=True)
- session = Column(String, index=True)
+ session_id = Column(String, index=True)
path = Column(String, index=True)
key = Column(String)
value = Column(PickleType)
- def __init__(self, session, key, value):
- self.session = session
+ def __init__(self, session_id, key, value):
+ self.session_id = session_id
self.key = key
self.value = value
class Value(SqlAlchemyBase):
__tablename__ = 'value'
id = Column(Integer, primary_key=True)
- session = Column(String, index=True)
+ session_id = Column(String, index=True)
path = Column(String, index=True)
key = Column(String)
value = Column(PickleType)
owner = Column(String)
+ indx = Column(Integer, index=True)
- def __init__(self, session, path, value, owner):
- self.session = session
+ def __init__(self, session_id, path, value, owner, index):
+ self.session_id = session_id
self.path = path
self.value = value
self.owner = owner
+ self.indx = index
class Values(Cache):
+ def getsession(self):
+ return util.Session()
+
# value
- def setvalue(self, path, value, owner):
+ def setvalue(self, path, value, owner, index, session):
"""set value for a path
a specified value must be associated to an owner
"""
- val = util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first()
+ #if it's a multi
+ if isinstance(value, list):
+ value = list(value)
+ val = session.query(Value).filter_by(
+ path=path, indx=index, session_id=self._storage.session_id).first()
if val is None:
- util.session.add(Value(self._storage.session_id, path, value,
- owner))
+ session.add(Value(self._storage.session_id, path, value,
+ owner, index))
else:
val.value = value
val.owner = owner
- util.session.commit()
+ session.commit()
- def getvalue(self, path):
+ def getvalue(self, path, session, index=None):
"""get value for a path
return: only value, not the owner
"""
- val = util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first()
+ val = session.query(Value).filter_by(
+ path=path, indx=index, session_id=self._storage.session_id).first()
if not val:
raise KeyError('no value found')
return val.value
- def hasvalue(self, path):
+ def hasvalue(self, path, session):
"""if path has a value
return: boolean
"""
- return util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first() is not None
+ return session.query(Value).filter_by(
+ path=path, session_id=self._storage.session_id).first() is not None
- def resetvalue(self, path):
+ def resetvalue(self, path, session):
"""remove value means delete value in storage
"""
- val = util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first()
- if val is not None:
- util.session.delete(val)
- util.session.commit()
+ vals = session.query(Value).filter_by(
+ path=path, session_id=self._storage.session_id).all()
+ if vals != []:
+ for val in vals:
+ session.delete(val)
+ session.commit()
def get_modified_values(self):
"""return all values in a dictionary
example: {'path1': (owner, 'value1'), 'path2': (owner, 'value2')}
"""
+ session = self.getsession()
ret = {}
- for val in util.session.query(Value).filter_by(
- session=self._storage.session_id).all():
- ret[val.path] = (val.owner, val.value)
+ for val in session.query(Value).filter_by(
+ session_id=self._storage.session_id).all():
+ value = val.value
+ if isinstance(val.value, list):
+ value = tuple(val.value)
+ ret[val.path] = (val.owner, value)
+ del(session)
return ret
# owner
- def setowner(self, path, owner):
+ def setowner(self, path, owner, session, index=None):
"""change owner for a path
"""
- val = util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first()
+ val = session.query(Value).filter_by(
+ path=path, indx=index, session_id=self._storage.session_id).first()
if val is None:
raise KeyError('no value found')
else:
val.owner = owner
- util.session.commit()
+ session.commit()
+
+ def get_max_length(self, path, session):
+ val = session.query(Value, func.max(Value.indx)).filter_by(
+ path=path, session_id=self._storage.session_id).first()
+ if val[1] is None:
+ maxval = 0
+ else:
+ maxval = val[1] + 1
+ return maxval
- def getowner(self, path, default):
+ def getowner(self, path, default, session, index=None, only_default=False):
+ #FIXME support de only_default
"""get owner for a path
return: owner object
"""
- val = util.session.query(Value).filter_by(
- path=path, session=self._storage.session_id).first()
+ session.commit()
+ val = session.query(Value).filter_by(
+ path=path, session_id=self._storage.session_id,
+ indx=index).first()
if val is None:
return default
else:
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
- pass
- val = util.session.query(_Vinformation).filter_by(
- key=key, session=self._storage.session_id).first()
+ session = self.getsession()
+ val = session.query(_Vinformation).filter_by(
+ key=key, session_id=self._storage.session_id).first()
if val is None:
- util.session.add(_Vinformation(self._storage.session_id, key,
+ session.add(_Vinformation(self._storage.session_id, key,
value))
else:
val.value = value
- util.session.commit()
+ session.commit()
+ del(session)
- def get_information(self, key):
+ def get_information(self, key, default):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
- val = util.session.query(_Vinformation).filter_by(
- key=key, session=self._storage.session_id).first()
+ session = self.getsession()
+ val = session.query(_Vinformation).filter_by(
+ key=key, session_id=self._storage.session_id).first()
+ del(session)
if not val:
+ if default is not undefined:
+ return default
raise ValueError("not found")
return val.value
+
+ def exportation(self, session, fake=False):
+ if fake:
+ #(('path1',), (index1,), (value1,), ('owner1'))
+ paths = []
+ indexes = []
+ values = []
+ owners_ = []
+ slaves = {}
+ for val in session.query(Value).filter_by(
+ session_id=self._storage.session_id).all():
+ if val.indx is not None:
+ slaves.setdefault(val.path, []).append((val.indx, val.value, getattr(owners, val.owner)))
+ else:
+ paths.append(val.path)
+ indexes.append(val.indx)
+ values.append(val.value)
+ owners_.append(getattr(owners, val.owner))
+ for path, vals in slaves.items():
+ paths.append(path)
+ t_idxes = []
+ t_vals = []
+ t_owners = []
+ for val in vals:
+ t_idxes.append(val[0])
+ t_vals.append(val[1])
+ t_owners.append(val[2])
+ indexes.append(tuple(t_idxes))
+ values.append(t_vals)
+ owners_.append(t_owners)
+ return (paths, indexes, values, owners_)
+ pass
+
+ def importation(self, value):
+ pass
+
+
+def delete_session(session_id, session):
+ for val in session.query(_Vinformation).filter_by(session_id=session_id).all():
+ session.delete(val)
+ for val in session.query(Value).filter_by(session_id=session_id).all():
+ session.delete(val)
return value
def _getvalue(self, opt, path, self_properties, index, submulti_index,
- with_meta, masterlen):
+ with_meta, masterlen, session):
"""actually retrieves the value
:param opt: the `option.Option()` object
force_default = 'frozen' in self_properties and \
'force_default_on_freeze' in self_properties
# not default value
- is_default = self._is_default_owner(opt, path,
+ is_default = self._is_default_owner(opt, path, session,
validate_properties=False,
validate_meta=False,
self_properties=self_properties,
index=index)
if not is_default and not force_default:
if opt.impl_is_master_slaves('slave'):
- return self._p_.getvalue(path, index)
+ return self._p_.getvalue(path, session, index)
else:
- value = self._p_.getvalue(path)
+ value = self._p_.getvalue(path, session)
if index is not None:
if len(value) > index:
return value[index]
path = opt.impl_getpath(self._getcontext())
return self._contains(path)
- def _contains(self, path):
- return self._p_.hasvalue(path)
+ def _contains(self, path, session=None):
+ if session is None:
+ session = self._p_.getsession()
+ return self._p_.hasvalue(path, session)
def __delitem__(self, opt):
"""overrides the builtins `del()` instructions"""
path = opt.impl_getpath(context)
if _setting_properties is None:
_setting_properties = setting._getproperties(read_write=False)
- hasvalue = self._contains(path)
+ session = self._p_.getsession()
+ hasvalue = self._contains(path, session)
if validate and hasvalue and 'validator' in _setting_properties:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_value = fake_context.cfgimpl_get_values()
fake_value.reset(opt, path, validate=False)
fake_value._get_cached_value(opt, path,
value = self._getdefaultvalue(opt, path, True, undefined, undefined, False)
self._setvalue(opt, path, value, force_owner=owners.forced)
else:
- self._p_.resetvalue(path)
+ self._p_.resetvalue(path, session)
context.cfgimpl_reset_cache()
def _isempty(self, opt, value, force_allow_empty_list=False, index=None):
setting_properties=undefined, self_properties=undefined,
index=None, submulti_index=undefined, from_masterslave=False,
with_meta=True, masterlen=undefined, check_frozen=False,
- returns_raise=False):
+ returns_raise=False, session=None):
context = self._getcontext()
settings = context.cfgimpl_get_settings()
if path is None:
else:
raise props
return value
+ if session is None:
+ session = self._p_.getsession()
if not from_masterslave and opt.impl_is_master_slaves():
val = opt.impl_get_master_slaves().getitem(self, opt, path,
validate,
force_permissive,
trusted_cached_properties,
validate_properties,
+ session,
setting_properties=setting_properties,
index=index,
self_properties=self_properties,
index=index,
submulti_index=submulti_index,
check_frozen=check_frozen,
- returns_raise=returns_raise)
+ returns_raise=returns_raise,
+ session=session)
if isinstance(val, Exception):
if returns_raise:
return val
index=None, submulti_index=undefined,
with_meta=True, setting_properties=undefined,
self_properties=undefined, masterlen=undefined,
- check_frozen=False, returns_raise=False):
+ check_frozen=False, returns_raise=False,
+ session=None):
"""same has getitem but don't touch the cache
index is None for slave value, if value returned is not a list, just return []
"""
if self_properties is undefined:
self_properties = setting._getproperties(opt, path, read_write=False, index=index)
config_error = None
+ if session is None:
+ session = self._p_.getsession()
value = self._getvalue(opt, path, self_properties, index, submulti_index,
- with_meta, masterlen)
+ with_meta, masterlen, session)
if isinstance(value, Exception):
if isinstance(value, ConfigError):
# For calculating properties, we need value (ie for mandatory
context = self._getcontext()
setting_properties = context.cfgimpl_get_settings()._getproperties(read_write=False)
if 'validator' in setting_properties:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_values = fake_context.cfgimpl_get_values()
fake_values._setvalue(opt, path, value)
- props = fake_values.validate(opt, value, path, check_frozen,
- force_permissive, setting_properties,
- not_raises=not_raises)
+ props = fake_values.validate(opt, value, path,
+ check_frozen=check_frozen,
+ force_permissive=force_permissive,
+ setting_properties=setting_properties,
+ session=session, not_raises=not_raises)
if props and not_raises:
return
err = opt.impl_validate(value, fake_context)
for idx, val in enumerate(value):
if isinstance(val, SubMulti):
value[idx] = list(val)
+ session = self._p_.getsession()
#FIXME pourquoi là et pas dans masterslaves ??
if opt.impl_is_master_slaves('slave'):
if index is not None:
- self._p_.setvalue(path, value[index], owner, index)
+ self._p_.setvalue(path, value[index], owner, index, session)
else:
- self._p_.resetvalue(path)
+ self._p_.resetvalue(path, session)
for idx, val in enumerate(value):
- self._p_.setvalue(path, val, owner, idx)
+ self._p_.setvalue(path, val, owner, idx, session)
else:
- self._p_.setvalue(path, value, owner, None)
+ self._p_.setvalue(path, value, owner, None, session)
+ del(session)
def validate(self, opt, value, path, check_frozen=True, force_permissive=False,
setting_properties=undefined, valid_masterslave=True,
- not_raises=False, returns_raise=False):
+ not_raises=False, returns_raise=False, session=None):
if valid_masterslave and opt.impl_is_master_slaves():
- opt.impl_get_master_slaves().validate(self, opt, value, path, returns_raise)
+ if session is None:
+ session = self._p_.getsession()
+ opt.impl_get_master_slaves().validate(self, opt, value, path, returns_raise, session)
props = self._getcontext().cfgimpl_get_settings().validate_properties(opt,
False,
check_frozen,
return props
raise props
- def _is_meta(self, opt, path):
+ def _is_meta(self, opt, path, session):
context = self._getcontext()
setting = context.cfgimpl_get_settings()
self_properties = setting._getproperties(opt, path, read_write=False)
if 'frozen' in self_properties and 'force_default_on_freeze' in self_properties:
return False
- if self._p_.getowner(path, owners.default, only_default=True) is not owners.default:
+ if self._p_.getowner(path, owners.default, session, only_default=True) is not owners.default:
return False
if context.cfgimpl_get_meta() is not None:
return True
return False
- def getowner(self, opt, index=None, force_permissive=False):
+ def getowner(self, opt, index=None, force_permissive=False, session=None):
"""
retrieves the option's owner
not isinstance(opt, DynSymLinkOption):
opt = opt._impl_getopt()
path = opt.impl_getpath(self._getcontext())
- return self._getowner(opt, path, index=index, force_permissive=force_permissive)
+ return self._getowner(opt, path, session, index=index, force_permissive=force_permissive)
- def _getowner(self, opt, path, validate_properties=True,
+ def _getowner(self, opt, path, session, validate_properties=True,
force_permissive=False, validate_meta=undefined,
self_properties=undefined, only_default=False,
index=None):
"""get owner of an option
"""
+ if session is None:
+ session = self._p_.getsession()
if not isinstance(opt, Option) and not isinstance(opt,
DynSymLinkOption):
raise ConfigError(_('owner only avalaible for an option'))
return owners.default
if validate_properties:
self._get_cached_value(opt, path, True, force_permissive, None, True,
- self_properties=self_properties)
- owner = self._p_.getowner(path, owners.default, only_default=only_default, index=index)
+ self_properties=self_properties, session=session)
+ owner = self._p_.getowner(path, owners.default, session, only_default=only_default, index=index)
if validate_meta is undefined:
if opt.impl_is_master_slaves('slave'):
master = opt.impl_get_master_slaves().getmaster(opt)
masterp = master.impl_getpath(context)
- validate_meta = self._is_meta(opt, masterp)
+ validate_meta = self._is_meta(opt, masterp, session)
else:
validate_meta = True
if validate_meta:
meta = context.cfgimpl_get_meta()
if owner is owners.default and meta is not None:
- owner = meta.cfgimpl_get_values()._getowner(opt, path,
+ owner = meta.cfgimpl_get_values()._getowner(opt, path, session,
validate_properties=validate_properties,
force_permissive=force_permissive,
self_properties=self_properties,
raise TypeError(_("invalid generic owner {0}").format(str(owner)))
path = opt.impl_getpath(self._getcontext())
- if not self._p_.hasvalue(path): # pragma: optional cover
+ session = self._p_.getsession()
+ if not self._p_.hasvalue(path, session): # pragma: optional cover
raise ConfigError(_('no value for {0} cannot change owner to {1}'
'').format(path, owner))
props = self._getcontext().cfgimpl_get_settings().validate_properties(opt,
index=index)
if props:
raise props
- self._p_.setowner(path, owner, index=index)
+ self._p_.setowner(path, owner, session, index=index)
def is_default_owner(self, opt, validate_properties=True,
validate_meta=True, index=None,
:return: boolean
"""
path = opt.impl_getpath(self._getcontext())
- return self._is_default_owner(opt, path,
+ return self._is_default_owner(opt, path, session=None,
validate_properties=validate_properties,
validate_meta=validate_meta, index=index,
force_permissive=force_permissive)
- def _is_default_owner(self, opt, path, validate_properties=True,
+ def _is_default_owner(self, opt, path, session, validate_properties=True,
validate_meta=True, self_properties=undefined,
index=None, force_permissive=False):
if not opt.impl_is_master_slaves('slave'):
index = None
- d = self._getowner(opt, path, validate_properties,
+ d = self._getowner(opt, path, session, validate_properties=validate_properties,
validate_meta=validate_meta,
self_properties=self_properties, only_default=True,
index=index, force_permissive=force_permissive)
if index < 0:
index = self.__len__() + index
if 'validator' in setting_properties and validate:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_multi = fake_context.cfgimpl_get_values()._get_cached_value(
self.opt, path=self.path, validate=False)
fake_multi._setitem(index, value, validate=False)
setting = context.cfgimpl_get_settings()
setting_properties = setting._getproperties(read_write=False)
if 'validator' in setting_properties:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_multi = fake_context.cfgimpl_get_values()._get_cached_value(
self.opt, path=self.path, validate=False,
force_permissive=force_permissive)
setting = setting = context.cfgimpl_get_settings()
setting_properties = setting._getproperties(read_write=False)
if 'validator' in setting_properties and validate and value is not None:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_multi = fake_context.cfgimpl_get_values()._get_cached_value(
self.opt, path=self.path, validate=False)
fake_multi.insert(index, value, validate=False)
setting = context.cfgimpl_get_settings()
setting_properties = setting._getproperties(read_write=False)
if 'validator' in setting_properties and validate:
- fake_context = context._gen_fake_values()
+ session = context.cfgimpl_get_values()._p_.getsession()
+ fake_context = context._gen_fake_values(session)
fake_multi = fake_context.cfgimpl_get_values()._get_cached_value(
self.opt, path=self.path, validate=False)
fake_multi.extend(iterable, validate=False)