Module Gnumed.pycommon.gmNetworkTools

GNUmed internetworking tools.

Functions

def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False)
Expand source code
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False):
        """Check for new releases at <url>.

        Returns (bool, text).
        True: new release available
        False: up to date
        None: don't know
        """
        if current_version is None:
                _log.debug('<current_version> is None, currency unknown')
                return (None, None)

        if current_version.casefold() in ['git head', 'head', 'tip', 'dev', 'devel']:
                _log.debug('[%s] always considered up to date', current_version)
                return (False, None)

        try:
                remote_file = urllib.request.urlopen(url)
        except (urllib.error.URLError, ValueError, OSError, IOError):
                # IOError: socket.error
                _log.exception("cannot retrieve version file from [%s]", url)
                return (None, _('Cannot retrieve version information from:\n\n%s') % url)

        _log.debug('retrieving version information from [%s]', url)

        cfg = gmCfgINI.gmCfgData()
        try:
                #remote_file.read().decode(resource.headers.get_content_charset())
                cfg.add_stream_source(source = 'gm-versions', stream = remote_file, encoding = u'utf8')
        except (UnicodeDecodeError):
                remote_file.close()
                _log.exception("cannot read version file from [%s]", url)
                return (None, _('Cannot read version information from:\n\n%s') % url)

        remote_file.close()

        latest_branch = cfg.get('latest branch', 'branch', source_order = [('gm-versions', 'return')])
        latest_release_on_latest_branch = cfg.get('branch %s' % latest_branch, 'latest release', source_order = [('gm-versions', 'return')])
        latest_release_on_current_branch = cfg.get('branch %s' % current_branch, 'latest release', source_order = [('gm-versions', 'return')])

        cfg.remove_source('gm-versions')

        _log.info('current release: %s', current_version)
        _log.info('current branch: %s', current_branch)
        _log.info('latest release on current branch: %s', latest_release_on_current_branch)
        _log.info('latest branch: %s', latest_branch)
        _log.info('latest release on latest branch: %s', latest_release_on_latest_branch)

        # anything known ?
        no_release_information_available = (
                (
                        (latest_release_on_current_branch is None) and
                        (latest_release_on_latest_branch is None)
                ) or (
                        not consider_latest_branch and
                        (latest_release_on_current_branch is None)
                )
        )
        if no_release_information_available:
                _log.warning('no release information available')
                msg = _('There is no version information available from:\n\n%s') % url
                return (None, msg)

        # up to date ?
        if consider_latest_branch:
                _log.debug('latest branch taken into account')
                if latest_release_on_latest_branch is None:
                        if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]:
                                _log.debug('up to date: current version >= latest version on current branch and no latest branch available')
                                return (False, None)
                else:
                        if compare_versions(latest_release_on_latest_branch, current_version) in [-1, 0]:
                                _log.debug('up to date: current version >= latest version on latest branch')
                                return (False, None)
        else:
                _log.debug('latest branch not taken into account')
                if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]:
                        _log.debug('up to date: current version >= latest version on current branch')
                        return (False, None)

        new_release_on_current_branch_available = (
                (latest_release_on_current_branch is not None) and
                (compare_versions(latest_release_on_current_branch, current_version) == 1)
        )
        _log.info('%snew release on current branch available', gmTools.bool2str(new_release_on_current_branch_available, '', 'no '))

        new_release_on_latest_branch_available = (
                (latest_branch is not None)
                        and
                (
                        (latest_branch > current_branch) or (
                                (latest_branch == current_branch) and
                                (compare_versions(latest_release_on_latest_branch, current_version) == 1)
                        )
                )
        )
        _log.info('%snew release on latest branch available', gmTools.bool2str(new_release_on_latest_branch_available, '', 'no '))

        if not (new_release_on_current_branch_available or new_release_on_latest_branch_available):
                _log.debug('up to date: no new releases available')
                return (False, None)

        # not up to date
        msg = _('A new version of GNUmed is available.\n\n')
        msg += _(' Your current version: "%s"\n') % current_version
        if consider_latest_branch:
                if new_release_on_current_branch_available:
                        msg += '\n'
                        msg += _(' New version: "%s"') % latest_release_on_current_branch
                        msg += '\n'
                        msg += _(' - bug fixes only\n')
                        msg += _(' - database fixups may be needed\n')
                if new_release_on_latest_branch_available:
                        if current_branch != latest_branch:
                                msg += '\n'
                                msg += _(' New version: "%s"') % latest_release_on_latest_branch
                                msg += '\n'
                                msg += _(' - bug fixes and new features\n')
                                msg += _(' - database upgrade required\n')
        else:
                msg += '\n'
                msg += _(' New version: "%s"') % latest_release_on_current_branch
                msg += '\n'
                msg += _(' - bug fixes only\n')
                msg += _(' - database fixups may be needed\n')

        msg += '\n\n'
        msg += _(
                'Note, however, that this version may not yet\n'
                'be available *pre-packaged* for your system.'
        )

        msg += '\n\n'
        msg += _('Details are found on <https://www.gnumed.de>.\n')
        msg += '\n'
        msg += _('Version information loaded from:\n\n %s') % url

        return (True, msg)

Check for new releases at .

Returns (bool, text). True: new release available False: up to date None: don't know

def compare_versions(left_version, right_version)
Expand source code
def compare_versions(left_version, right_version):
        """
         0: left == right
        -1: left < right
         1: left > right
        """
        _log.debug('comparing [%s] with [%s]', left_version, right_version)
        if left_version == right_version:
                _log.debug('same version')
                return 0

        if right_version in ['head', 'dev', 'devel']:
                _log.debug('development code')
                return -1

        if left_version in ['head', 'dev', 'devel']:
                _log.debug('development code')
                return 1

        left_parts = left_version.split('.')
        right_parts = right_version.split('.')

        tmp, left_major = gmTools.input2decimal('%s.%s' % (left_parts[0], left_parts[1]))
        tmp, right_major = gmTools.input2decimal('%s.%s' % (right_parts[0], right_parts[1]))

        if left_major < right_major:
                _log.debug('left version [%s] < right version [%s]: major part', left_version, right_version)
                return -1

        if left_major > right_major:
                _log.debug('left version [%s] > right version [%s]: major part', left_version, right_version)
                return 1

        tmp, left_part3 = gmTools.input2decimal(left_parts[2].replace('rc', '0.'))
        tmp, right_part3 = gmTools.input2decimal(right_parts[2].replace('rc', '0.'))

        if left_part3 < right_part3:
                _log.debug('left version [%s] < right version [%s]: minor part', left_version, right_version)
                return -1

        if left_part3 > right_part3:
                _log.debug('left version [%s] > right version [%s]: minor part', left_version, right_version)
                return 1

        return 0

0: left == right -1: left < right 1: left > right

def compose_and_send_email(sender=None,
receiver=None,
message=None,
server=None,
auth=None,
debug=False,
subject=None,
attachments=None)
Expand source code
def compose_and_send_email(sender=None, receiver=None, message=None, server=None, auth=None, debug=False, subject=None, attachments=None):
        email = compose_email (
                sender = sender,
                receiver = receiver,
                message = message,
                subject = subject,
                files2attach = attachments
        )
        return send_email (
                sender = sender,
                receiver = receiver,
                email = email,
                server = server,
                auth = auth,
                debug = debug
        )
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None)
Expand source code
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None):
        # filenames unicode
        # file content binary or utf8
        # files2attach = [(filename, mimetype-or-None), ...]

        if message is None:
                raise ValueError('<message> is None, cannot compose email')

        message = message.lstrip().lstrip('\r\n').lstrip()

        if sender is None:
                sender = default_mail_sender

        if receiver is None:
                receiver = [default_mail_receiver]

        if subject is None:
                subject = 'compose_email() test'

        if files2attach is None:
                email = MIMEText(message, 'plain', 'utf8')
        else:
                email = MIMEMultipart()
                email.attach(MIMEText(message, 'plain', 'utf8'))

        email['From'] = sender
        email['To'] = ', '.join(receiver)
        email['Subject'] = subject

        if files2attach is None:
                return email

        for file2attach in files2attach:
                filename = file2attach[0]
                try:
                        mimetype = file2attach[1]
                except IndexError:
                        mimetype = gmMimeLib.guess_mimetype(filename = filename)
                # text/*
                if mimetype.startswith('text/'):
                        txt = open(filename, mode = 'rt', encoding = 'utf8')
                        attachment = MIMEText(txt.read(), 'plain', 'utf8')
                        txt.close()
                # image/*
                elif mimetype.startswith('image/'):
                        img = open(filename, mode = 'rb')
                        attachment = MIMEImage(img.read())
                        img.close()
                # audio/*
                elif mimetype.startswith('audio/'):
                        song = open(filename, mode = 'rb')
                        attachment = MIMEAudio(song.read())
                        song.close()
                # catch-all application/*
                else:
                        _log.debug('attaching [%s] with type [%s]', filename, mimetype)
                        mime_subtype = mimetype.split('/', 1)[1]
                        data = open(filename, mode = 'rb')
                        attachment = MIMEApplication(data.read(), mime_subtype)
                        data.close()

                try:
                        attachment.replace_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename))
                except KeyError:
                        attachment.add_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename))
                email.attach(attachment)

        return email
def download_data_pack(pack_url, filename=None, md5_url=None)
Expand source code
def download_data_pack(pack_url, filename=None, md5_url=None):

        _log.debug('downloading data pack from: %s', pack_url)
        dp_fname = download_file(pack_url, filename = filename, suffix = 'zip')
        _log.debug('downloading MD5 from: %s', md5_url)
        md5_fname = download_file(md5_url, filename = dp_fname + '.md5')

        md5_file = open(md5_fname, mode = 'rt', encoding = 'utf-8-sig')
        md5_expected = md5_file.readline().strip('\n')
        md5_file.close()
        _log.debug('expected MD5: %s', md5_expected)
        md5_calculated = gmTools.file2md5(dp_fname, return_hex = True)
        _log.debug('calculated MD5: %s', md5_calculated)

        if md5_calculated != md5_expected:
                _log.error('mismatch of expected vs calculated MD5: [%s] vs [%s]', md5_expected, md5_calculated)
                return (False, (md5_expected, md5_calculated))

        return True, dp_fname
def download_data_packs_list(url, filename=None)
Expand source code
def download_data_packs_list(url, filename=None):
        return download_file(url, filename = filename, suffix = 'conf')
def download_file(url, filename=None, suffix=None)
Expand source code
def download_file(url, filename=None, suffix=None):
        if filename is None:
                filename = gmTools.get_unique_filename(prefix = 'gm-dl-', suffix = suffix)
        _log.debug('downloading [%s] into [%s]', url, filename)
        try:
                dl_name, headers = urllib.request.urlretrieve(url, filename)
        except (ValueError, OSError, IOError):
                _log.exception('cannot download from [%s]', url)
                gmLog2.log_stack_trace()
                return None

        _log.debug('%s' % headers)
        return dl_name
def install_data_pack(data_pack=None, conn=None)
Expand source code
def install_data_pack(data_pack=None, conn=None):
        from Gnumed.pycommon import gmPsql
        psql = gmPsql.Psql(conn)
        sql_script = os.path.join(data_pack['unzip_dir'], 'install-data-pack.sql')
        if psql.run(sql_script) == 0:
                curs = conn.cursor()
                curs.execute('select gm.log_script_insertion(%(name)s, %(ver)s)', {'name': data_pack['pack_url'], 'ver': 'current'})
                curs.close()
                conn.commit()
                return True

        _log.error('error installing data pack: %s', data_pack)
        return False
def mirror_url(url: str, base_dir: str = None, verbose: bool = False) ‑> str
Expand source code
def mirror_url(url:str, base_dir:str=None, verbose:bool=False) -> str:
        """Mirror the web*page* at _url_, non-recursively.

        Note: Not for mirroring a *site* (recursively).

        Args:
                url: the URL to mirror
                base_dir: where to store the page and its prerequisites, sandbox dir under tmp_dir if None
        """
        assert (url is not None), '<url> must not be None'
        _log.debug('mirroring: %s', url)
        if base_dir is None:
                prefix = url.split('://')[-1]
                prefix = prefix.strip(':').strip('/').replace('/', '#')
                prefix = gmTools.fname_sanitize(prefix)
                base_dir = gmTools.mk_sandbox_dir(prefix = prefix + '-')
        _log.debug('base dir: %s', base_dir)
        wget_cmd = [
                'wget',
                '--directory-prefix=%s' % base_dir,
                #'--adjust-extension',
                '--no-remove-listing',
                '--timestamping',
                '--page-requisites',
                '--continue',
                '--convert-links',
                '--user-agent=""',
                '--execute', 'robots=off',
                '--wait=1'
        ]
        if verbose:
                wget_cmd.append('--debug')
        wget_cmd.append(url)
        #wget --output-file=logfile
        #'<a href="%s">%s</a>' % (url, url),
        success, ret_code, STDOUT = gmShellAPI.run_process(cmd_line = wget_cmd, timeout = 15, verbose = verbose)
        if success:
                return base_dir

        return None

Mirror the webpage at url, non-recursively.

Note: Not for mirroring a site (recursively).

Args

url
the URL to mirror
base_dir
where to store the page and its prerequisites, sandbox dir under tmp_dir if None
def open_url_in_browser(url: str = None, new: int = 2, autoraise: bool = True) ‑> bool
Expand source code
def open_url_in_browser(url:str=None, new:int=2, autoraise:bool=True) -> bool:
        """Open an URL in a browser.

        Args:
                url: URL to open
                new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible
        """
        if not url:
                return True

        try:
                webbrowser.open(url, new = new, autoraise = autoraise)
        except (webbrowser.Error, OSError, UnicodeEncodeError):
                _log.exception('error calling browser with url=[%s]', url)
                return False

        return True

Open an URL in a browser.

Args

url
URL to open
new
whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible
def open_urls_in_browser(urls: list[str] = None,
new: int = 2,
autoraise: bool = True,
initial_delay: int = 200,
*args,
**kwargs) ‑> bool
Expand source code
def open_urls_in_browser(urls:list[str]=None, new:int=2, autoraise:bool=True, initial_delay:int=200, *args, **kwargs) -> bool:
        """Open URLs in a browser.

        Args:
                urls: URLs to open
                new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible
                initial_delay: milliseconds to sleep after opening first URL (so that the browser may start up)
        """
        if not urls:
                return True

        # can't timewarp just yet
        duration2sleep = max(initial_delay, 0) / 1000
        for url in urls:
                open_url_in_browser(url = url, new = new, autoraise = autoraise)
                time.sleep(duration2sleep)
                duration2sleep = 0
        return True

Open URLs in a browser.

Args

urls
URLs to open
new
whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible
initial_delay
milliseconds to sleep after opening first URL (so that the browser may start up)
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False)
Expand source code
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False):

        if email is None:
                raise ValueError('<email> is None, cannot send')

        if sender is None:
                sender = default_mail_sender

        if receiver is None:
                receiver = [default_mail_receiver]

        if server is None:
                server = default_mail_server

        import smtplib
        failed = False
        refused = []
        try:
                session = smtplib.SMTP(server)
                session.set_debuglevel(debug)
                try:
                        session.starttls()
                except smtplib.SMTPException:
                        _log.error('cannot enable TLS on [%s]', server)
                session.ehlo()
                if auth is not None:
                        session.login(auth['user'], auth['password'])
                refused = session.sendmail(sender, receiver, email.as_string())
                session.quit()
        except smtplib.SMTPException:
                failed = True
                _log.exception('cannot send email')
                gmLog2.log_stack_trace()

        if len(refused) > 0:
                _log.error("refused recipients: %s" % refused)

        if failed:
                return False

        return True
def unzip_data_pack(filename=None)
Expand source code
def unzip_data_pack(filename=None):

        unzip_dir = os.path.splitext(filename)[0]
        _log.debug('unzipping data pack into [%s]', unzip_dir)
        gmTools.mkdir(unzip_dir)
        try:
                data_pack = zipfile.ZipFile(filename, 'r')
        except (zipfile.BadZipfile):
                _log.exception('cannot unzip data pack [%s]', filename)
                gmLog2.log_stack_trace()
                return None

        data_pack.extractall(unzip_dir)

        return unzip_dir