Module Gnumed.pycommon.gmCfgINI

GNUmed INI style configuration handling.

Am Thu, May 22, 2025 at 11:45:31PM -0400 schrieb Grant Edwards via Python-list:

There is sort of a traditional set of locations where applications look to find a config file:

$HOME/ $HOME/.config/ $HOME/.config// /etc/ /etc// /usr/local/etc/ /usr/local/etc//

The last two overried all of the others.

Config files that reside in $HOME/ usually start with a dot. Often they end in 'rc'. Config files in other directories usually don't start with a dot.

There's usually an directory only when an app needs multiple config files. If an app only has one config file, tradition is that you don't need a directory for it.

Many applications will parse two config files: a global one from /etc or /usr/local/ and a user-one from somewhere under $HOME.

Functions

def parse_INI_stream(stream=None, encoding=None)
Expand source code
def parse_INI_stream(stream=None, encoding=None):
        """Parse an iterable for INI-style data.

        Returns a dict by sections containing a dict of values per section.
        """
        _log.debug('parsing INI-style data stream [%s] using [%s]', stream, encoding)

        if encoding is None:
                encoding = 'utf8'

        data = {}
        current_group = None
        current_option = None
        current_option_path = None
        inside_list = False
        line_idx = 0

        for line in stream:
                if type(line) is bytes:
                        line = line.decode(encoding)
                line = line.replace('\015', '').replace('\012', '').strip()
                line_idx += 1

                if inside_list:
                        if line == '$%s$' % current_option:             # end of list
                                inside_list = False
                                continue
                        data[current_option_path].append(line)
                        continue

                # noise
                if line == '' or line.startswith('#') or line.startswith(';'):
                        continue

                # group
                if line.startswith('['):
                        if not line.endswith(']'):
                                _log.error('group line does not end in "]", aborting')
                                _log.error(line)
                                raise ValueError('INI-stream parsing error')
                        group = line.strip('[]').strip()
                        if group == '':
                                _log.error('group name is empty, aborting')
                                _log.error(line)
                                raise ValueError('INI-stream parsing error')
                        current_group = group
                        continue

                # option
                if current_group is None:
                        _log.warning('option found before first group, ignoring')
                        _log.error(line)
                        continue

                name, remainder = regex.split(r'\s*[=:]\s*', line, maxsplit = 1)
                if name == '':
                        _log.error('option name empty, aborting')
                        _log.error(line)
                        raise ValueError('INI-stream parsing error')

                if remainder.strip() == '':
                        if ('=' not in line) and (':' not in line):
                                _log.error('missing name/value separator (= or :), aborting')
                                _log.error(line)
                                raise ValueError('INI-stream parsing error')

                current_option = name
                current_option_path = '%s::%s' % (current_group, current_option)
                if current_option_path in data:
                        _log.warning('duplicate option [%s]', current_option_path)

                value = remainder.split('#', 1)[0].strip()

                # start of list ?
                if value == '$%s$' % current_option:
                        inside_list = True
                        data[current_option_path] = []
                        continue

                data[current_option_path] = value

        if inside_list:
                _log.critical('unclosed list $%s$ detected at end of config stream [%s]', current_option, stream)
                raise SyntaxError('end of config stream but still in list')

        return data

Parse an iterable for INI-style data.

Returns a dict by sections containing a dict of values per section.

def set_option_in_INI_file(filename=None, group=None, option=None, value=None, encoding='utf8')
Expand source code
def set_option_in_INI_file(filename=None, group=None, option=None, value=None, encoding='utf8'):

        _log.debug('setting option "%s" to "%s" in group [%s]', option, value, group)
        _log.debug('file: %s (%s)', filename, encoding)

        sink = tempfile.NamedTemporaryFile(suffix = '.cfg', delete = True)
        sink_name = sink.name
        sink.close()    # close it so it gets deleted so we can safely open it again
        src = open(filename, mode = 'rt', encoding = encoding)
        sink = open(sink_name, mode = 'wt', encoding = encoding)

        # is value a list ?
        if isinstance(value, type([])):
                __set_list_in_INI_file(src, sink, group, option, value)
        else:
                __set_opt_in_INI_file(src, sink, group, option, value)

        sink.close()
        src.close()

        shutil.copy2(sink_name, filename)

Classes

class gmCfgData
Expand source code
class gmCfgData(gmBorg.cBorg):

        def __init__(self):
                try:
                        self.__cfg_data
                except AttributeError:
                        self.__cfg_data = {}
                        self.source_files = {}

        #--------------------------------------------------
        def get(self, group=None, option=None, source_order=None):
                """Get the value of a configuration option in a config file.

                <source_order> the order in which config files are searched
                        a list of tuples (source, policy)
                        policy:
                                return: return only this value immediately
                                append: append to list of potential values to return
                                extend: if the value per source happens to be a list
                                        extend (rather than append to) the result list

                returns NONE when there's no value for an option
                """
                if source_order is None:
                        source_order = [('internal', 'return')]
                results = []
                for source, policy in source_order:
                        _log.debug('searching "%s" in [%s] in %s', option, group, source)
                        if group is None:
                                group = source
                        option_path = '%s::%s' % (group, option)
                        try: source_data = self.__cfg_data[source]
                        except KeyError:
                                _log.error('invalid config source [%s]', source)
                                _log.debug('currently known sources: %s', list(self.__cfg_data))
                                continue

                        try: value = source_data[option_path]
                        except KeyError:
                                _log.debug('option [%s] not in group [%s] in source [%s]', option, group, source)
                                continue
                        _log.debug('option [%s] found in source [%s]', option_path, source)

                        if policy == 'return':
                                return value

                        if policy == 'extend':
                                if isinstance(value, type([])):
                                        results.extend(value)
                                else:
                                        results.append(value)
                        else:
                                results.append(value)

                if len(results) == 0:
                        return None

                return results

        #--------------------------------------------------
        def set_option(self, option=None, value=None, group=None, source=None):
                """Set a particular option to a particular value.

                Note that this does NOT PERSIST the option anywhere !
                """
                if None in [option, value]:
                        raise ValueError('neither <option> nor <value> can be None')
                if source is None:
                        source = 'internal'
                        try:
                                self.__cfg_data[source]
                        except KeyError:
                                self.__cfg_data[source] = {}
                if group is None:
                        group = source
                option_path = '%s::%s' % (group, option)
                self.__cfg_data[source][option_path] = value
        #--------------------------------------------------
        # API: source related
        #--------------------------------------------------
        def add_stream_source(self, source=None, stream=None, encoding=None):
                data = parse_INI_stream(stream = stream, encoding = encoding)
                if source in self.__cfg_data:
                        _log.warning('overriding source <%s> with [%s]', source, stream)

                self.__cfg_data[source] = data
        #--------------------------------------------------
        def add_file_source(self, source=None, filename=None, encoding='utf8'):
                """Add a source (a file) to the instance."""

                _log.info('file source "%s": %s (%s)', source, filename, encoding)

                for existing_source, existing_file in self.source_files.items():
                        if existing_file == filename:
                                if source != existing_source:
                                        _log.warning('file [%s] already known as source [%s]', filename, existing_source)
                                        _log.warning('adding it as source [%s] may provoke trouble', source)

                cfg_file = None
                if filename is not None:
                        try:
                                cfg_file = open(filename, mode = 'rt', encoding = encoding)
                        except IOError:
                                _log.error('cannot open [%s], keeping as dummy source', filename)

                if cfg_file is None:
                        filename = None
                        if source in self.__cfg_data:
                                _log.warning('overriding source <%s> with dummy', source)
                        self.__cfg_data[source] = {}
                else:
                        self.add_stream_source(source = source, stream = cfg_file)
                        cfg_file.close()

                self.source_files[source] = filename

        #--------------------------------------------------
        def remove_source(self, source):
                """Remove a source from the instance."""

                _log.info('removing source <%s>', source)

                try:
                        del self.__cfg_data[source]
                except KeyError:
                        _log.warning("source <%s> doesn't exist", source)

                try:
                        del self.source_files[source]
                except KeyError:
                        pass

        #--------------------------------------------------
        def reload_file_source(self, filename=None, encoding='utf8'):
                if filename not in self.source_files.values():
                        return

                for src, fname in self.source_files.items():
                        if fname == filename:
                                self.add_file_source(source = src, filename = fname, encoding = encoding)
                                # don't break the loop because there could be other sources
                                # with the same file (not very reasonable, I know)
                                #break

        #--------------------------------------------------
        def add_cli(self, short_options='', long_options=None):
                """Add command line parameters to config data.

                short:
                        string containing one-letter options such as u'h?' for -h -?
                long:
                        list of strings
                        'conf-file=' -> --conf-file=<...>
                        'debug' -> --debug
                """
                _log.info('adding command line arguments')
                _log.debug('raw command line is:')
                _log.debug('%s', sys.argv)
                import getopt
                if long_options is None:
                        long_options = []
                try:
                        opts, remainder = getopt.gnu_getopt (
                                sys.argv[1:],
                                short_options,
                                long_options
                        )
                except getopt.GetoptError as exc:
                        _log.exception('error parsing command line options')
                        print('GNUmed startup: error loading command line options')
                        print('GNUmed startup:', exc)
                        return False

                data = {}
                for opt, val in opts:
                        if val == '':
                                data['%s::%s' % ('cli', opt)] = True
                        else:
                                data['%s::%s' % ('cli', opt)] = val
                self.__cfg_data['cli'] = data
                return True

A generic Borg mixin for new-style classes.

  • mixin this class with your class' ancestors to borg it

  • there may be many instances of this - PER CHILD CLASS - but they all share state

Ancestors

Methods

def add_cli(self, short_options='', long_options=None)
Expand source code
def add_cli(self, short_options='', long_options=None):
        """Add command line parameters to config data.

        short:
                string containing one-letter options such as u'h?' for -h -?
        long:
                list of strings
                'conf-file=' -> --conf-file=<...>
                'debug' -> --debug
        """
        _log.info('adding command line arguments')
        _log.debug('raw command line is:')
        _log.debug('%s', sys.argv)
        import getopt
        if long_options is None:
                long_options = []
        try:
                opts, remainder = getopt.gnu_getopt (
                        sys.argv[1:],
                        short_options,
                        long_options
                )
        except getopt.GetoptError as exc:
                _log.exception('error parsing command line options')
                print('GNUmed startup: error loading command line options')
                print('GNUmed startup:', exc)
                return False

        data = {}
        for opt, val in opts:
                if val == '':
                        data['%s::%s' % ('cli', opt)] = True
                else:
                        data['%s::%s' % ('cli', opt)] = val
        self.__cfg_data['cli'] = data
        return True

Add command line parameters to config data.

short: string containing one-letter options such as u'h?' for -h -? long: list of strings 'conf-file=' -> –conf-file=<…> 'debug' -> –debug

def add_file_source(self, source=None, filename=None, encoding='utf8')
Expand source code
def add_file_source(self, source=None, filename=None, encoding='utf8'):
        """Add a source (a file) to the instance."""

        _log.info('file source "%s": %s (%s)', source, filename, encoding)

        for existing_source, existing_file in self.source_files.items():
                if existing_file == filename:
                        if source != existing_source:
                                _log.warning('file [%s] already known as source [%s]', filename, existing_source)
                                _log.warning('adding it as source [%s] may provoke trouble', source)

        cfg_file = None
        if filename is not None:
                try:
                        cfg_file = open(filename, mode = 'rt', encoding = encoding)
                except IOError:
                        _log.error('cannot open [%s], keeping as dummy source', filename)

        if cfg_file is None:
                filename = None
                if source in self.__cfg_data:
                        _log.warning('overriding source <%s> with dummy', source)
                self.__cfg_data[source] = {}
        else:
                self.add_stream_source(source = source, stream = cfg_file)
                cfg_file.close()

        self.source_files[source] = filename

Add a source (a file) to the instance.

def add_stream_source(self, source=None, stream=None, encoding=None)
Expand source code
def add_stream_source(self, source=None, stream=None, encoding=None):
        data = parse_INI_stream(stream = stream, encoding = encoding)
        if source in self.__cfg_data:
                _log.warning('overriding source <%s> with [%s]', source, stream)

        self.__cfg_data[source] = data
def get(self, group=None, option=None, source_order=None)
Expand source code
def get(self, group=None, option=None, source_order=None):
        """Get the value of a configuration option in a config file.

        <source_order> the order in which config files are searched
                a list of tuples (source, policy)
                policy:
                        return: return only this value immediately
                        append: append to list of potential values to return
                        extend: if the value per source happens to be a list
                                extend (rather than append to) the result list

        returns NONE when there's no value for an option
        """
        if source_order is None:
                source_order = [('internal', 'return')]
        results = []
        for source, policy in source_order:
                _log.debug('searching "%s" in [%s] in %s', option, group, source)
                if group is None:
                        group = source
                option_path = '%s::%s' % (group, option)
                try: source_data = self.__cfg_data[source]
                except KeyError:
                        _log.error('invalid config source [%s]', source)
                        _log.debug('currently known sources: %s', list(self.__cfg_data))
                        continue

                try: value = source_data[option_path]
                except KeyError:
                        _log.debug('option [%s] not in group [%s] in source [%s]', option, group, source)
                        continue
                _log.debug('option [%s] found in source [%s]', option_path, source)

                if policy == 'return':
                        return value

                if policy == 'extend':
                        if isinstance(value, type([])):
                                results.extend(value)
                        else:
                                results.append(value)
                else:
                        results.append(value)

        if len(results) == 0:
                return None

        return results

Get the value of a configuration option in a config file.

the order in which config files are searched a list of tuples (source, policy) policy: return: return only this value immediately append: append to list of potential values to return extend: if the value per source happens to be a list extend (rather than append to) the result list

returns NONE when there's no value for an option

def reload_file_source(self, filename=None, encoding='utf8')
Expand source code
def reload_file_source(self, filename=None, encoding='utf8'):
        if filename not in self.source_files.values():
                return

        for src, fname in self.source_files.items():
                if fname == filename:
                        self.add_file_source(source = src, filename = fname, encoding = encoding)
                        # don't break the loop because there could be other sources
                        # with the same file (not very reasonable, I know)
                        #break
def remove_source(self, source)
Expand source code
def remove_source(self, source):
        """Remove a source from the instance."""

        _log.info('removing source <%s>', source)

        try:
                del self.__cfg_data[source]
        except KeyError:
                _log.warning("source <%s> doesn't exist", source)

        try:
                del self.source_files[source]
        except KeyError:
                pass

Remove a source from the instance.

def set_option(self, option=None, value=None, group=None, source=None)
Expand source code
def set_option(self, option=None, value=None, group=None, source=None):
        """Set a particular option to a particular value.

        Note that this does NOT PERSIST the option anywhere !
        """
        if None in [option, value]:
                raise ValueError('neither <option> nor <value> can be None')
        if source is None:
                source = 'internal'
                try:
                        self.__cfg_data[source]
                except KeyError:
                        self.__cfg_data[source] = {}
        if group is None:
                group = source
        option_path = '%s::%s' % (group, option)
        self.__cfg_data[source][option_path] = value

Set a particular option to a particular value.

Note that this does NOT PERSIST the option anywhere !