Module Gnumed.pycommon.gmNetworkTools
GNUmed internetworking tools.
Functions
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False)-
Expand source code
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False): """Check for new releases at <url>. Returns (bool, text). True: new release available False: up to date None: don't know """ if current_version is None: _log.debug('<current_version> is None, currency unknown') return (None, None) if current_version.casefold() in ['git head', 'head', 'tip', 'dev', 'devel']: _log.debug('[%s] always considered up to date', current_version) return (False, None) try: remote_file = urllib.request.urlopen(url) except (urllib.error.URLError, ValueError, OSError, IOError): # IOError: socket.error _log.exception("cannot retrieve version file from [%s]", url) return (None, _('Cannot retrieve version information from:\n\n%s') % url) _log.debug('retrieving version information from [%s]', url) cfg = gmCfgINI.gmCfgData() try: #remote_file.read().decode(resource.headers.get_content_charset()) cfg.add_stream_source(source = 'gm-versions', stream = remote_file, encoding = u'utf8') except (UnicodeDecodeError): remote_file.close() _log.exception("cannot read version file from [%s]", url) return (None, _('Cannot read version information from:\n\n%s') % url) remote_file.close() latest_branch = cfg.get('latest branch', 'branch', source_order = [('gm-versions', 'return')]) latest_release_on_latest_branch = cfg.get('branch %s' % latest_branch, 'latest release', source_order = [('gm-versions', 'return')]) latest_release_on_current_branch = cfg.get('branch %s' % current_branch, 'latest release', source_order = [('gm-versions', 'return')]) cfg.remove_source('gm-versions') _log.info('current release: %s', current_version) _log.info('current branch: %s', current_branch) _log.info('latest release on current branch: %s', latest_release_on_current_branch) _log.info('latest branch: %s', latest_branch) _log.info('latest release on latest branch: %s', latest_release_on_latest_branch) # anything known ? no_release_information_available = ( ( (latest_release_on_current_branch is None) and (latest_release_on_latest_branch is None) ) or ( not consider_latest_branch and (latest_release_on_current_branch is None) ) ) if no_release_information_available: _log.warning('no release information available') msg = _('There is no version information available from:\n\n%s') % url return (None, msg) # up to date ? if consider_latest_branch: _log.debug('latest branch taken into account') if latest_release_on_latest_branch is None: if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on current branch and no latest branch available') return (False, None) else: if compare_versions(latest_release_on_latest_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on latest branch') return (False, None) else: _log.debug('latest branch not taken into account') if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on current branch') return (False, None) new_release_on_current_branch_available = ( (latest_release_on_current_branch is not None) and (compare_versions(latest_release_on_current_branch, current_version) == 1) ) _log.info('%snew release on current branch available', gmTools.bool2str(new_release_on_current_branch_available, '', 'no ')) new_release_on_latest_branch_available = ( (latest_branch is not None) and ( (latest_branch > current_branch) or ( (latest_branch == current_branch) and (compare_versions(latest_release_on_latest_branch, current_version) == 1) ) ) ) _log.info('%snew release on latest branch available', gmTools.bool2str(new_release_on_latest_branch_available, '', 'no ')) if not (new_release_on_current_branch_available or new_release_on_latest_branch_available): _log.debug('up to date: no new releases available') return (False, None) # not up to date msg = _('A new version of GNUmed is available.\n\n') msg += _(' Your current version: "%s"\n') % current_version if consider_latest_branch: if new_release_on_current_branch_available: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_current_branch msg += '\n' msg += _(' - bug fixes only\n') msg += _(' - database fixups may be needed\n') if new_release_on_latest_branch_available: if current_branch != latest_branch: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_latest_branch msg += '\n' msg += _(' - bug fixes and new features\n') msg += _(' - database upgrade required\n') else: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_current_branch msg += '\n' msg += _(' - bug fixes only\n') msg += _(' - database fixups may be needed\n') msg += '\n\n' msg += _( 'Note, however, that this version may not yet\n' 'be available *pre-packaged* for your system.' ) msg += '\n\n' msg += _('Details are found on <https://www.gnumed.de>.\n') msg += '\n' msg += _('Version information loaded from:\n\n %s') % url return (True, msg)Check for new releases at
. Returns (bool, text). True: new release available False: up to date None: don't know
def compare_versions(left_version, right_version)-
Expand source code
def compare_versions(left_version, right_version): """ 0: left == right -1: left < right 1: left > right """ _log.debug('comparing [%s] with [%s]', left_version, right_version) if left_version == right_version: _log.debug('same version') return 0 if right_version in ['head', 'dev', 'devel']: _log.debug('development code') return -1 if left_version in ['head', 'dev', 'devel']: _log.debug('development code') return 1 left_parts = left_version.split('.') right_parts = right_version.split('.') tmp, left_major = gmTools.input2decimal('%s.%s' % (left_parts[0], left_parts[1])) tmp, right_major = gmTools.input2decimal('%s.%s' % (right_parts[0], right_parts[1])) if left_major < right_major: _log.debug('left version [%s] < right version [%s]: major part', left_version, right_version) return -1 if left_major > right_major: _log.debug('left version [%s] > right version [%s]: major part', left_version, right_version) return 1 tmp, left_part3 = gmTools.input2decimal(left_parts[2].replace('rc', '0.')) tmp, right_part3 = gmTools.input2decimal(right_parts[2].replace('rc', '0.')) if left_part3 < right_part3: _log.debug('left version [%s] < right version [%s]: minor part', left_version, right_version) return -1 if left_part3 > right_part3: _log.debug('left version [%s] > right version [%s]: minor part', left_version, right_version) return 1 return 00: left == right -1: left < right 1: left > right
def compose_and_send_email(sender=None,
receiver=None,
message=None,
server=None,
auth=None,
debug=False,
subject=None,
attachments=None)-
Expand source code
def compose_and_send_email(sender=None, receiver=None, message=None, server=None, auth=None, debug=False, subject=None, attachments=None): email = compose_email ( sender = sender, receiver = receiver, message = message, subject = subject, files2attach = attachments ) return send_email ( sender = sender, receiver = receiver, email = email, server = server, auth = auth, debug = debug ) def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None)-
Expand source code
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None): # filenames unicode # file content binary or utf8 # files2attach = [(filename, mimetype-or-None), ...] if message is None: raise ValueError('<message> is None, cannot compose email') message = message.lstrip().lstrip('\r\n').lstrip() if sender is None: sender = default_mail_sender if receiver is None: receiver = [default_mail_receiver] if subject is None: subject = 'compose_email() test' if files2attach is None: email = MIMEText(message, 'plain', 'utf8') else: email = MIMEMultipart() email.attach(MIMEText(message, 'plain', 'utf8')) email['From'] = sender email['To'] = ', '.join(receiver) email['Subject'] = subject if files2attach is None: return email for file2attach in files2attach: filename = file2attach[0] try: mimetype = file2attach[1] except IndexError: mimetype = gmMimeLib.guess_mimetype(filename = filename) # text/* if mimetype.startswith('text/'): txt = open(filename, mode = 'rt', encoding = 'utf8') attachment = MIMEText(txt.read(), 'plain', 'utf8') txt.close() # image/* elif mimetype.startswith('image/'): img = open(filename, mode = 'rb') attachment = MIMEImage(img.read()) img.close() # audio/* elif mimetype.startswith('audio/'): song = open(filename, mode = 'rb') attachment = MIMEAudio(song.read()) song.close() # catch-all application/* else: _log.debug('attaching [%s] with type [%s]', filename, mimetype) mime_subtype = mimetype.split('/', 1)[1] data = open(filename, mode = 'rb') attachment = MIMEApplication(data.read(), mime_subtype) data.close() try: attachment.replace_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename)) except KeyError: attachment.add_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename)) email.attach(attachment) return email def download_data_pack(pack_url, filename=None, md5_url=None)-
Expand source code
def download_data_pack(pack_url, filename=None, md5_url=None): _log.debug('downloading data pack from: %s', pack_url) dp_fname = download_file(pack_url, filename = filename, suffix = 'zip') _log.debug('downloading MD5 from: %s', md5_url) md5_fname = download_file(md5_url, filename = dp_fname + '.md5') md5_file = open(md5_fname, mode = 'rt', encoding = 'utf-8-sig') md5_expected = md5_file.readline().strip('\n') md5_file.close() _log.debug('expected MD5: %s', md5_expected) md5_calculated = gmTools.file2md5(dp_fname, return_hex = True) _log.debug('calculated MD5: %s', md5_calculated) if md5_calculated != md5_expected: _log.error('mismatch of expected vs calculated MD5: [%s] vs [%s]', md5_expected, md5_calculated) return (False, (md5_expected, md5_calculated)) return True, dp_fname def download_data_packs_list(url, filename=None)-
Expand source code
def download_data_packs_list(url, filename=None): return download_file(url, filename = filename, suffix = 'conf') def download_file(url, filename=None, suffix=None)-
Expand source code
def download_file(url, filename=None, suffix=None): if filename is None: filename = gmTools.get_unique_filename(prefix = 'gm-dl-', suffix = suffix) _log.debug('downloading [%s] into [%s]', url, filename) try: dl_name, headers = urllib.request.urlretrieve(url, filename) except (ValueError, OSError, IOError): _log.exception('cannot download from [%s]', url) gmLog2.log_stack_trace() return None _log.debug('%s' % headers) return dl_name def install_data_pack(data_pack=None, conn=None)-
Expand source code
def install_data_pack(data_pack=None, conn=None): from Gnumed.pycommon import gmPsql psql = gmPsql.Psql(conn) sql_script = os.path.join(data_pack['unzip_dir'], 'install-data-pack.sql') if psql.run(sql_script) == 0: curs = conn.cursor() curs.execute('select gm.log_script_insertion(%(name)s, %(ver)s)', {'name': data_pack['pack_url'], 'ver': 'current'}) curs.close() conn.commit() return True _log.error('error installing data pack: %s', data_pack) return False def mirror_url(url: str, base_dir: str = None, verbose: bool = False) ‑> str-
Expand source code
def mirror_url(url:str, base_dir:str=None, verbose:bool=False) -> str: """Mirror the web*page* at _url_, non-recursively. Note: Not for mirroring a *site* (recursively). Args: url: the URL to mirror base_dir: where to store the page and its prerequisites, sandbox dir under tmp_dir if None """ assert (url is not None), '<url> must not be None' _log.debug('mirroring: %s', url) if base_dir is None: prefix = url.split('://')[-1] prefix = prefix.strip(':').strip('/').replace('/', '#') prefix = gmTools.fname_sanitize(prefix) base_dir = gmTools.mk_sandbox_dir(prefix = prefix + '-') _log.debug('base dir: %s', base_dir) wget_cmd = [ 'wget', '--directory-prefix=%s' % base_dir, #'--adjust-extension', '--no-remove-listing', '--timestamping', '--page-requisites', '--continue', '--convert-links', '--user-agent=""', '--execute', 'robots=off', '--wait=1' ] if verbose: wget_cmd.append('--debug') wget_cmd.append(url) #wget --output-file=logfile #'<a href="%s">%s</a>' % (url, url), success, ret_code, STDOUT = gmShellAPI.run_process(cmd_line = wget_cmd, timeout = 15, verbose = verbose) if success: return base_dir return NoneMirror the webpage at url, non-recursively.
Note: Not for mirroring a site (recursively).
Args
url- the URL to mirror
base_dir- where to store the page and its prerequisites, sandbox dir under tmp_dir if None
def open_url_in_browser(url: str = None, new: int = 2, autoraise: bool = True) ‑> bool-
Expand source code
def open_url_in_browser(url:str=None, new:int=2, autoraise:bool=True) -> bool: """Open an URL in a browser. Args: url: URL to open new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible """ if not url: return True try: webbrowser.open(url, new = new, autoraise = autoraise) except (webbrowser.Error, OSError, UnicodeEncodeError): _log.exception('error calling browser with url=[%s]', url) return False return TrueOpen an URL in a browser.
Args
url- URL to open
new- whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible
def open_urls_in_browser(urls: list[str] = None,
new: int = 2,
autoraise: bool = True,
initial_delay: int = 200,
*args,
**kwargs) ‑> bool-
Expand source code
def open_urls_in_browser(urls:list[str]=None, new:int=2, autoraise:bool=True, initial_delay:int=200, *args, **kwargs) -> bool: """Open URLs in a browser. Args: urls: URLs to open new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible initial_delay: milliseconds to sleep after opening first URL (so that the browser may start up) """ if not urls: return True # can't timewarp just yet duration2sleep = max(initial_delay, 0) / 1000 for url in urls: open_url_in_browser(url = url, new = new, autoraise = autoraise) time.sleep(duration2sleep) duration2sleep = 0 return TrueOpen URLs in a browser.
Args
urls- URLs to open
new- whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible
initial_delay- milliseconds to sleep after opening first URL (so that the browser may start up)
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False)-
Expand source code
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False): if email is None: raise ValueError('<email> is None, cannot send') if sender is None: sender = default_mail_sender if receiver is None: receiver = [default_mail_receiver] if server is None: server = default_mail_server import smtplib failed = False refused = [] try: session = smtplib.SMTP(server) session.set_debuglevel(debug) try: session.starttls() except smtplib.SMTPException: _log.error('cannot enable TLS on [%s]', server) session.ehlo() if auth is not None: session.login(auth['user'], auth['password']) refused = session.sendmail(sender, receiver, email.as_string()) session.quit() except smtplib.SMTPException: failed = True _log.exception('cannot send email') gmLog2.log_stack_trace() if len(refused) > 0: _log.error("refused recipients: %s" % refused) if failed: return False return True def unzip_data_pack(filename=None)-
Expand source code
def unzip_data_pack(filename=None): unzip_dir = os.path.splitext(filename)[0] _log.debug('unzipping data pack into [%s]', unzip_dir) gmTools.mkdir(unzip_dir) try: data_pack = zipfile.ZipFile(filename, 'r') except (zipfile.BadZipfile): _log.exception('cannot unzip data pack [%s]', filename) gmLog2.log_stack_trace() return None data_pack.extractall(unzip_dir) return unzip_dir