
Billing code.

Copyright: authors

Expand source code
# -*- coding: utf-8 -*-
"""Billing code.

Copyright: authors
__author__ = "Nico Latzer <>, Karsten Hilbert <>"
__license__ = 'GPL v2 or later (details at'

import sys
import zlib
import decimal
import logging

if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmDateTime
from Gnumed.pycommon import gmBusinessDBObject

from import gmDemographicRecord
from import gmDocuments

_log = logging.getLogger('gm.bill')

# default: old style
DEFAULT_INVOICE_ID_TEMPLATE = 'GM%(pk_pat)s / %(date)s / %(time)s'

# billables
_SQL_get_billable_fields = "SELECT * FROM ref.v_billables WHERE %s"

class cBillable(gmBusinessDBObject.cBusinessDBObject):
        """Items which can be billed to patients."""

        _cmd_fetch_payload = _SQL_get_billable_fields % "pk_billable = %s"
        _cmds_store_payload = [
                """UPDATE ref.billable SET
                                fk_data_source = %(pk_data_source)s,
                                code = %(billable_code)s,
                                term = %(billable_description)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                amount = %(raw_amount)s,
                                currency = %(currency)s,
                                vat_multiplier = %(vat_multiplier)s,
                                active = %(active)s
                                --, discountable = %(discountable)s
                                pk = %(pk_billable)s
                                xmin = %(xmin_billable)s
                                xmin AS xmin_billable

        _updatable_fields = [
        def format(self):
                txt = '%s                                    [#%s]\n\n' % (
                        gmTools.bool2subst (
                                _('Active billable item'),
                                _('Inactive billable item')
                txt += ' %s: %s\n' % (
                txt += _(' %(curr)s%(raw_val)s + %(perc_vat)s%% VAT = %(curr)s%(val_w_vat)s\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'raw_val': self._payload[self._idx['raw_amount']],
                        'perc_vat': self._payload[self._idx['vat_multiplier']] * 100,
                        'val_w_vat': self._payload[self._idx['amount_with_vat']]
                txt += ' %s %s%s (%s)' % (
                        gmTools.coalesce(self._payload[self._idx['catalog_language']], '', ' - %s'),
                txt += gmTools.coalesce(self._payload[self._idx['comment']], '', '\n %s')

                return txt
        def _get_is_in_use(self):
                cmd = 'SELECT EXISTS(SELECT 1 FROM bill.bill_item WHERE fk_billable = %(pk)s LIMIT 1)'
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'pk': self._payload[self._idx['pk_billable']]}}])
                return rows[0][0]

        is_in_use = property(_get_is_in_use)

def get_billables(active_only=True, order_by=None, return_pks=False):

        if order_by is None:
                order_by = ' ORDER BY catalog_long, catalog_version, billable_code'
                order_by = ' ORDER BY %s' % order_by

        if active_only:
                where = 'active IS true'
                where = 'true'

        cmd = (_SQL_get_billable_fields % where) + order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
        if return_pks:
                return [ r['pk_billable'] for r in rows ]
        return [ cBillable(row = {'data': r, 'idx': idx, 'pk_field': 'pk_billable'}) for r in rows ]

def create_billable(code=None, term=None, data_source=None, return_existing=False):
        args = {
                'code': code.strip(),
                'term': term.strip(),
                'data_src': data_source
        cmd = """
                INSERT INTO ref.billable (code, term, fk_data_source)
                WHERE NOT EXISTS (
                        SELECT 1 FROM ref.billable
                                code = %(code)s
                                term = %(term)s
                                fk_data_source = %(data_src)s
                RETURNING pk"""
        rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False, return_data = True)
        if len(rows) > 0:
                return cBillable(aPK_obj = rows[0]['pk'])

        if not return_existing:
                return None

        cmd = """
                SELECT * FROM ref.v_billables
                        code = %(code)s
                        term = %(term)s
                        pk_data_source = %(data_src)s
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        return cBillable(row = {'data': rows[0], 'idx': idx, 'pk_field': 'pk_billable'})

def delete_billable(pk_billable=None):
        cmd = """
                DELETE FROM ref.billable
                        pk = %(pk)s
                        NOT EXISTS (
                                SELECT 1 FROM bill.bill_item WHERE fk_billable = %(pk)s
        args = {'pk': pk_billable}
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

# bill items
_SQL_get_bill_item_fields = u"SELECT * FROM bill.v_bill_items WHERE %s"

class cBillItem(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_bill_item_fields % u"pk_bill_item = %s"
        _cmds_store_payload = [
                """UPDATE bill.bill_item SET
                                fk_provider = %(pk_provider)s,
                                fk_encounter = %(pk_encounter_to_bill)s,
                                date_to_bill = %(raw_date_to_bill)s,
                                description = gm.nullify_empty_string(%(item_detail)s),
                                net_amount_per_unit = %(net_amount_per_unit)s,
                                currency = gm.nullify_empty_string(%(currency)s),
                                fk_bill = %(pk_bill)s,
                                unit_count = %(unit_count)s,
                                amount_multiplier = %(amount_multiplier)s
                                pk = %(pk_bill_item)s
                                xmin = %(xmin_bill_item)s
                                xmin AS xmin_bill_item

        _updatable_fields = [
        def format(self):
                txt = '%s (%s %s%s)         [#%s]\n' % (
                                self._payload[self._idx['pk_bill']] is None,
                                _('Open item'),
                                _('Billed item'),
                        gmTools.coalesce(self._payload[self._idx['catalog_language']], '', ' - %s'),
                txt += ' %s: %s\n' % (
                txt += gmTools.coalesce (
                        '  (%s)\n',
                txt += gmTools.coalesce (
                        _(' Details: %s\n'),

                txt += '\n'
                txt += _(' %s of units: %s\n') % (
                txt += _(' Amount per unit: %(curr)s%(val_p_unit)s (%(cat_curr)s%(cat_val)s per catalog)\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'val_p_unit': self._payload[self._idx['net_amount_per_unit']],
                        'cat_curr': self._payload[self._idx['billable_currency']],
                        'cat_val': self._payload[self._idx['billable_amount']]
                txt += _(' Amount multiplier: %s\n') % self._payload[self._idx['amount_multiplier']]
                txt += _(' VAT would be: %(perc_vat)s%% %(equals)s %(curr)s%(vat)s\n') % {
                        'perc_vat': self._payload[self._idx['vat_multiplier']] * 100,
                        'equals': gmTools.u_corresponds_to,
                        'curr': self._payload[self._idx['currency']],
                        'vat': self._payload[self._idx['vat']]

                txt += '\n'
                txt += _(' Charge date: %s') % gmDateTime.pydt_strftime (
                        '%Y %b %d',
                        accuracy = gmDateTime.acc_days
                bill = self.bill
                if bill is not None:
                        txt += _('\n On bill: %s') % bill['invoice_id']

                return txt
        def _get_billable(self):
                return cBillable(aPK_obj = self._payload[self._idx['pk_billable']])

        billable = property(_get_billable)
        def _get_bill(self):
                if self._payload[self._idx['pk_bill']] is None:
                        return None
                return cBill(aPK_obj = self._payload[self._idx['pk_bill']])

        bill = property(_get_bill)
        def _get_is_in_use(self):
                return self._payload[self._idx['pk_bill']] is not None

        is_in_use = property(_get_is_in_use)
def get_bill_items(pk_patient=None, non_invoiced_only=False, return_pks=False):
        if non_invoiced_only:
                cmd = _SQL_get_bill_item_fields % u"pk_patient = %(pat)s AND pk_bill IS NULL"
                cmd = _SQL_get_bill_item_fields % u"pk_patient = %(pat)s"
        args = {'pat': pk_patient}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        if return_pks:
                return [ r['pk_bill_item'] for r in rows ]
        return [ cBillItem(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill_item'}) for r in rows ]

def create_bill_item(pk_encounter=None, pk_billable=None, pk_staff=None):

        billable = cBillable(aPK_obj = pk_billable)
        cmd = """
                INSERT INTO bill.bill_item (
                ) VALUES (
                RETURNING pk"""
        args = {
                'staff': pk_staff,
                'enc': pk_encounter,
                'val': billable['raw_amount'],
                'curr': billable['currency'],
                'billable': pk_billable
        rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        return cBillItem(aPK_obj = rows[0][0])

def delete_bill_item(link_obj=None, pk_bill_item=None):
        cmd = 'DELETE FROM bill.bill_item WHERE pk = %(pk)s AND fk_bill IS NULL'
        args = {'pk': pk_bill_item}
        gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])

# bills
_SQL_get_bill_fields = """SELECT * FROM bill.v_bills WHERE %s"""

class cBill(gmBusinessDBObject.cBusinessDBObject):
        """Represents a bill"""

        _cmd_fetch_payload = _SQL_get_bill_fields % "pk_bill = %s"
        _cmds_store_payload = [
                """UPDATE bill.bill SET
                                invoice_id = gm.nullify_empty_string(%(invoice_id)s),
                                close_date = %(close_date)s,
                                apply_vat = %(apply_vat)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                fk_receiver_identity = %(pk_receiver_identity)s,
                                fk_receiver_address = %(pk_receiver_address)s,
                                fk_doc = %(pk_doc)s
                                pk = %(pk_bill)s
                                xmin = %(xmin_bill)s
                                pk as pk_bill,
                                xmin as xmin_bill
        _updatable_fields = [
        def format(self, include_receiver=True, include_doc=True):
                txt = '%s                       [#%s]\n' % (
                        gmTools.bool2subst (
                                (self._payload[self._idx['close_date']] is None),
                                _('Open bill'),
                                _('Closed bill')
                txt += _(' Invoice ID: %s\n') % self._payload[self._idx['invoice_id']]

                if self._payload[self._idx['close_date']] is not None:
                        txt += _(' Closed: %s\n') % gmDateTime.pydt_strftime (
                                '%Y %b %d',
                                accuracy = gmDateTime.acc_days

                if self._payload[self._idx['comment']] is not None:
                        txt += _(' Comment: %s\n') % self._payload[self._idx['comment']]

                txt += _(' Bill value: %(curr)s%(val)s\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'val': self._payload[self._idx['total_amount']]

                if self._payload[self._idx['apply_vat']] is None:
                        txt += _(' VAT: undecided\n')
                elif self._payload[self._idx['apply_vat']] is True:
                        txt += _(' VAT: %(perc_vat)s%% %(equals)s %(curr)s%(vat)s\n') % {
                                'perc_vat': self._payload[self._idx['percent_vat']],
                                'equals': gmTools.u_corresponds_to,
                                'curr': self._payload[self._idx['currency']],
                                'vat': self._payload[self._idx['total_vat']]
                        txt += _(' Value + VAT: %(curr)s%(val)s\n') % {
                                'curr': self._payload[self._idx['currency']],
                                'val': self._payload[self._idx['total_amount_with_vat']]
                        txt += _(' VAT: does not apply\n')

                if self._payload[self._idx['pk_bill_items']] is None:
                        txt += _(' Items billed: 0\n')
                        txt += _(' Items billed: %s\n') % len(self._payload[self._idx['pk_bill_items']])
                if include_doc:
                        txt += _(' Invoice: %s\n') % (
                                gmTools.bool2subst (
                                        self._payload[self._idx['pk_doc']] is None,
                                        _('not available'),
                                        '#%s' % self._payload[self._idx['pk_doc']]
                txt += _(' Patient: #%s\n') % self._payload[self._idx['pk_patient']]
                if include_receiver:
                        txt += gmTools.coalesce (
                                _(' Receiver: #%s\n')
                        if self._payload[self._idx['pk_receiver_address']] is not None:
                                txt += '\n '.join(gmDemographicRecord.get_patient_address(pk_patient_address = self._payload[self._idx['pk_receiver_address']]).format())

                return txt
        def add_items(self, items=None):
                """Requires no pending changes within the bill itself."""
                # should check for item consistency first
                conn = gmPG2.get_connection(readonly = False)
                for item in items:
                        item['pk_bill'] = self._payload[self._idx['pk_bill']]
               = conn)
                self.refetch_payload()          # make sure aggregates are re-filled from view
        def _get_bill_items(self):
                return [ cBillItem(aPK_obj = pk) for pk in self._payload[self._idx['pk_bill_items']] ]

        bill_items = property(_get_bill_items)
        def _get_invoice(self):
                if self._payload[self._idx['pk_doc']] is None:
                        return None
                return gmDocuments.cDocument(aPK_obj = self._payload[self._idx['pk_doc']])

        invoice = property(_get_invoice)
        def _get_address(self):
                if self._payload[self._idx['pk_receiver_address']] is None:
                        return None
                return gmDemographicRecord.get_address_from_patient_address_pk (
                        pk_patient_address = self._payload[self._idx['pk_receiver_address']]

        address = property(_get_address)
        def _get_default_address(self):
                return gmDemographicRecord.get_patient_address_by_type (
                        pk_patient = self._payload[self._idx['pk_patient']],
                        adr_type = 'billing'

        default_address = property(_get_default_address)
        def _get_home_address(self):
                return gmDemographicRecord.get_patient_address_by_type (
                        pk_patient = self._payload[self._idx['pk_patient']],
                        adr_type = 'home'

        home_address = property(_get_home_address)
        def set_missing_address_from_default(self):
                if self._payload[self._idx['pk_receiver_address']] is not None:
                        return True
                adr = self.default_address
                if adr is None:
                        adr = self.home_address
                        if adr is None:
                                return False
                self['pk_receiver_address'] = adr['pk_lnk_person_org_address']
                return self.save_payload()

def get_bills(order_by=None, pk_patient=None, return_pks=False):

        args = {'pat': pk_patient}
        where_parts = ['true']

        if pk_patient is not None:
                where_parts.append('pk_patient = %(pat)s')

        if order_by is None:
                order_by = ''
                order_by = ' ORDER BY %s' % order_by

        cmd = (_SQL_get_bill_fields % ' AND '.join(where_parts)) + order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        if return_pks:
                return [ r['pk_bill'] for r in rows ]
        return [ cBill(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill'}) for r in rows ]

def get_bills4document(pk_document=None):
        args = {'pk_doc': pk_document}
        cmd = _SQL_get_bill_fields % 'pk_doc = %(pk_doc)s'
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        return [ cBill(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill'}) for r in rows ]

def create_bill(conn=None, invoice_id=None):

        args = {'inv_id': invoice_id}
        cmd = """
                INSERT INTO bill.bill (invoice_id)
                VALUES (gm.nullify_empty_string(%(inv_id)s))
                RETURNING pk
        rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False)

        return cBill(aPK_obj = rows[0]['pk'])

def delete_bill(link_obj=None, pk_bill=None):
        args = {'pk': pk_bill}
        cmd = "DELETE FROM bill.bill WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
        return True

def get_bill_receiver(pk_patient=None):

def generate_invoice_id(template=None, pk_patient=None, person=None, date_format='%Y-%m-%d', time_format='%H%M%S'):
        """Generate invoice ID string, based on template.

        No template given -> generate old style fixed format invoice ID.

                        if included, $counter$ is not *needed* (but still possible)

                        will be replaced by a counter, counting up from 1 until the invoice id is unique, max 999999
        assert (None in [pk_patient, person]), u'either of <pk_patient> or <person> can be defined, but not both'

        if (template is None) or (template.strip() == u''):
                template = DEFAULT_INVOICE_ID_TEMPLATE
                date_format = '%Y-%m-%d'
                time_format = '%H%M%S'
        template = template.strip()
        _log.debug('invoice ID template: %s', template)
        if pk_patient is None:
                if person is not None:
                        pk_patient = person.ID
        now = gmDateTime.pydt_now_here()
        data = {}
        data['pk_pat'] = gmTools.coalesce(pk_patient, '?')
        data['date'] = gmDateTime.pydt_strftime(now, date_format).strip()
        data['time'] = gmDateTime.pydt_strftime(now, time_format).strip()
        if person is None:
                data['firstname'] = u'?'
                data['lastname'] = u'?'
                data['dob'] = u'?'
                data['firstname'] = person['firstnames'].replace(' ', gmTools.u_space_as_open_box).strip()
                data['lastname'] = person['lastnames'].replace(' ', gmTools.u_space_as_open_box).strip()
                data['dob'] = person.get_formatted_dob (
                        format = date_format,
                        none_string = u'?',
                        honor_estimation = False
        candidate_invoice_id = template % data
        if u'#counter#' not in candidate_invoice_id:
                if u'%(time)s' in template:
                        return candidate_invoice_id

                candidate_invoice_id = candidate_invoice_id + u' [##counter#]'

        _log.debug('invoice id candidate: %s', candidate_invoice_id)
        # get existing invoice IDs consistent with candidate
        search_term = u'^\s*%s\s*$' % gmPG2.sanitize_pg_regex(expression = candidate_invoice_id).replace(u'#counter#', '\d+')
        cmd = u'SELECT invoice_id FROM bill.bill WHERE invoice_id ~* %(search_term)s UNION ALL SELECT invoice_id FROM audit.log_bill WHERE invoice_id ~* %(search_term)s'
        args = {'search_term': search_term}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) == 0:
                return candidate_invoice_id.replace(u'#counter#', u'1')

        existing_invoice_ids = [ r['invoice_id'].strip() for r in rows ]
        counter = None
        counter_max = 999999
        for idx in range(1, counter_max):
                candidate = candidate_invoice_id.replace(u'#counter#', '%s' % idx)
                if candidate not in existing_invoice_ids:
                        counter = idx
        if counter is None:
                # exhausted the range, unlikely (1 million bills are possible
                # even w/o any other invoice ID data) but technically possible
                _log.debug('exhausted uniqueness space of [%s] invoice IDs per template', counter_max)
                counter = '>%s[%s]' % (counter_max, data['time'])

        return candidate_invoice_id.replace(u'#counter#', '%s' % counter)

def __generate_invoice_id_lock_token(invoice_id):
        """Turn invoice ID into integer token for PG level locking.

        CAUTION: This is NOT compatible with any of 1.8 or below.
        Do NOT attempt to run this against a v22 database at the
        risk of duplicate invoice IDs.
        _log.debug('invoice ID: %s', invoice_id)
        data4adler32 = 'adler32---[%s]---[%s]' % (invoice_id, invoice_id)
        adler32 = zlib.adler32(bytes(data4adler32, 'utf8'))
        _log.debug('adler32: %s', adler32)
        data4crc32 = 'crc32---[%s]---[adler32:%s]' % (invoice_id, adler32)
        _log.debug('data for CRC 32: %s', data4crc32)
        crc32 = zlib.crc32(bytes(data4crc32, 'utf8'), adler32)
        _log.debug('CRC 32: %s', crc32)
        return crc32

def lock_invoice_id(invoice_id):
        """Lock an invoice ID.

        The lock taken is an exclusive advisory lock in PostgreSQL.

        Because the data is short _and_ crc32/adler32 are fairly
        weak we assume that collisions can be created "easily".
        Therefore we apply both algorithms concurrently.

        NOT compatible with anything 1.8 or below.
        _log.debug('locking invoice ID: %s', invoice_id)
        token = __generate_invoice_id_lock_token(invoice_id)
        cmd = "SELECT pg_try_advisory_lock(%s)" % token
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        except gmPG2.dbapi.ProgrammingError:
                _log.exception('cannot lock invoice ID: [%s] (%s)', invoice_id, token)
                return False

        if rows[0][0]:
                return True

        _log.error('cannot lock invoice ID: [%s] (%s)', invoice_id, token)
        return False

def unlock_invoice_id(invoice_id):
        _log.debug('unlocking invoice ID: %s', invoice_id)
        token = __generate_invoice_id_lock_token(invoice_id)
        cmd = u"SELECT pg_advisory_unlock(%s)" % token
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        except gmPG2.dbapi.ProgrammingError:
                _log.exception('cannot unlock invoice ID: [%s] (%s)', invoice_id, token)
                return False

        if rows[0][0]:
                return True

        _log.error('cannot unlock invoice ID: [%s] (%s)', invoice_id, token)
        return False

def generate_scan2pay_qrcode(data:str=None, create_svg:bool=False):
        return gmTools.create_qrcode (
                text = data,
                verbose = False,
                ecc_level = 'M',                # Wikipedia says must be M
                create_svg = create_svg

def generate_scan2pay_string (
) -> str:
        """Create scan2pay data for generating a QR code.
        BCD                                                                                                             # (3) fixed, barcode tag
        002                                                                                                             # (3) fixed, version
        1                                                                                                               # (1) charset, 1 = utf8
        SCT                                                                                                             # (3) fixed
        $<praxis_id::BIC//Bank//%(value)s::11>$                                 # (11) <BIC>
        $2<range_of::$<current_provider_name::%(lastnames)s::>$,$<praxis::%(praxis)s::>$::70>2$                 # (70) <Name of beneficiary> "Empfänger" - Praxis
        $<praxis_id::IBAN//Bank//%(value)s::34>$                                # (34) <IBAN>
        EUR$<bill::%(total_amount_with_vat)s::12>$                              # (12) <Amount in EURO> "EUR12.5"
                                                                                                                        # (4) <purpose of transfer> - leer
                                                                                                                        # (35) <remittance info - struct> - only this XOR the next field - GNUmed: leer
        $2<range_of::InvID=$<bill::%(invoice_id)s::>$/Date=$<today::%d.%B %Y::>$::140$>2$       # (140) <remittance info - text> "Client:Marie Louise La Lune" - "Rg Nr, date"
        <beneficiary-to-payor info>                                                             # (70)  "pay soon :-)" - optional - GNUmed nur wenn bytes verfügbar
        total: 331 bytes (not chars ! - cave UTF8)
        EOL: LF or CRLF
        last *used* element not followed by anything, IOW can omit pending non-used elements
        assert IBAN, '<IBAN> must be given'
        assert beneficiary, '<beneficiary> must be given'
        assert amount, '<amount> must be given'
        assert invoice_id, '<invoice_id> must be given'

        data = {}
        data['IBAN'] = IBAN[:34]
        data['beneficiary'] = beneficiary[:70]
        if not BIC:
                BIC = ''
        data['BIC'] = BIC[:11]
        data['amount'] = str(amount)[:9]
        data['ref'] = invoice_id[:140]
        if not comment:
                comment = gmDateTime.pydt_now_here().strftime('%Y %b %d')
        data['cmt'] = comment[:70]
        data_str = 'BCD\n002\n1\nSCT\n%(BIC)s\n%(beneficiary)s\n%(IBAN)s\nEUR%(amount)s\n\n\n%(ref)s\n%(cmt)s' % data
        data_str_bytes = bytes(data_str, 'utf8')[:331]
        return str(data_str_bytes, 'utf8')

def get_scan2pay_data(branch, bill, provider=None, comment=None):
        """Format data from bill, branch, and provider for scan2pay QR code generation."""
        assert (branch is not None), '<branch> must not be <None>'
        assert (bill is not None), '<bill> must not be <None>'

        IBANs = branch.get_external_ids(id_type = 'IBAN', issuer = 'Bank')
        if len(IBANs) == 0:
                _log.debug('no IBAN found, cannot create scan2pay data')
                return None

        IBAN = IBANs[0]['value']
        beneficiary = gmTools.coalesce (
                value2test = provider,
                return_instead = branch['praxis'][:70],
                template4value = '%%(lastnames)s, %s' % branch['praxis']
        BICs = branch.get_external_ids(id_type = 'BIC', issuer = 'Bank')
        if BICs:
                BIC = BICs[0]['value']
                BIC = ''
        amount = bill['total_amount_with_vat']
        invoice_id = (_('Inv: %s, %s') % (
                gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), '%d.%B %Y')
        return generate_scan2pay_string (
                IBAN = IBAN,
                beneficiary = beneficiary,
                BIC = BIC,
                amount = amount,
                invoice_id = invoice_id,
                comment = comment

def __get_scan2pay_data(branch, bill, provider=None, comment=None):
        """Create scan2pay data for generating a QR code.
        BCD                                                                                                             # (3) fixed, barcode tag
        002                                                                                                             # (3) fixed, version
        1                                                                                                               # (1) charset, 1 = utf8
        SCT                                                                                                             # (3) fixed
        $<praxis_id::BIC//Bank//%(value)s::11>$                                 # (11) <BIC>
        $2<range_of::$<current_provider_name::%(lastnames)s::>$,$<praxis::%(praxis)s::>$::70>2$                 # (70) <Name of beneficiary> "Empfänger" - Praxis
        $<praxis_id::IBAN//Bank//%(value)s::34>$                                # (34) <IBAN>
        EUR$<bill::%(total_amount_with_vat)s::12>$                              # (12) <Amount in EURO> "EUR12.5"
                                                                                                                        # (4) <purpose of transfer> - leer
                                                                                                                        # (35) <remittance info - struct> - only this XOR the next field - GNUmed: leer
        $2<range_of::InvID=$<bill::%(invoice_id)s::>$/Date=$<today::%d.%B %Y::>$::140$>2$       # (140) <remittance info - text> "Client:Marie Louise La Lune" - "Rg Nr, date"
        <beneficiary-to-payor info>                                                             # (70)  "pay soon :-)" - optional - GNUmed nur wenn bytes verfügbar
        total: 331 bytes (not chars ! - cave UTF8)
        EOL: LF or CRLF
        last *used* element not followed by anything, IOW can omit pending non-used elements
        assert (branch is not None), '<branch> must not be <None>'
        assert (bill is not None), '<bill> must not be <None>'

        data = {}
        IBANs = branch.get_external_ids(id_type = 'IBAN', issuer = 'Bank')
        if len(IBANs) == 0:
                _log.debug('no IBAN found, cannot create scan2pay data')
                return None
        data['IBAN'] = IBANs[0]['value'][:34]
        data['beneficiary'] = gmTools.coalesce (
                value2test = provider,
                return_instead = branch['praxis'][:70],
                template4value = '%%(lastnames)s, %s' % branch['praxis']
        BICs = branch.get_external_ids(id_type = 'BIC', issuer = 'Bank')
        if len(BICs) == 0:
                data['BIC'] = ''
                data['BIC'] = BICs[0]['value'][:11]
        data['amount'] = bill['total_amount_with_vat'][:9]
        data['ref'] = (_('Inv: %s, %s') % (
                gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), '%d.%B %Y')
        data['cmt'] = gmTools.coalesce(comment, '', '\n%s')[:70]

        data_str = 'BCD\n002\n1\nSCT\n%(BIC)s\n%(beneficiary)s\n%(IBAN)s\nEUR%(amount)s\n\n\n%(ref)s%(cmt)s' % data
        data_str_bytes = bytes(data_str, 'utf8')[:331]
        return str(data_str_bytes, 'utf8')

# main
if __name__ == "__main__":

        if len(sys.argv) < 2:

        if sys.argv[1] != 'test':

#       from Gnumed.pycommon import gmLog2
#       from Gnumed.pycommon import gmI18N
#       from import gmPerson
        from import gmPraxis

#       gmI18N.activate_locale()

        def test_default_address():
                bills = get_bills(pk_patient = 12)
                first_bill = bills[0]

        def test_me():
                me = cBillable(aPK_obj=1)
                fields = me.get_fields()
                for field in fields:
                        print(field, ':', me[field])
                print("updatable:", me.get_updatable_fields())
                #me['vat']=4; me.store_payload()

        def test_get_scan2pay_data():
                prax = gmPraxis.get_praxis_branches()[0]
                bills = get_bills(pk_patient = 12)
                print(get_scan2pay_data (
                        comment = 'GNUmed test harness' + ('x' * 400)

        def test_generate_invoice_id():
                from Gnumed.pycommon import gmI18N
                from import gmPerson
                for idx in range(1,15):
                        print ('')
                        print ('classic:', generate_invoice_id(pk_patient = idx))
                        pat = gmPerson.cPerson(idx)
                        template = u'%(firstname).4s%(lastname).4s%(date)s'
                        print ('new: template = "%s" => %s' % (
                                generate_invoice_id (
                                        template = template,
                                        pk_patient = None,
                                        person = pat,
                        template = u'%(firstname).4s%(lastname).4s%(date)s-#counter#'
                        new_id = generate_invoice_id (
                                template = template,
                                pk_patient = None,
                                person = pat,
                        print('locked: %s' % lock_invoice_id(new_id))
                        print ('new: template = "%s" => %s' % (template, new_id))
                        print('unlocked: %s' % unlock_invoice_id(new_id))

                #generate_invoice_id(template=None, pk_patient=None, person=None, date_format='%Y-%m-%d', time_format='%H%M%S')

        def test_generate_scan2pay_string():
                print(generate_scan2pay_string (
                        IBAN = 'DE014032403423',
                        beneficiary = 'GNUmed developers',
                        BIC = 'NDOLSD99X',
                        amount = '1.99',
                        invoice_id = 'GM-01-1234-x034'
                        , comment = 'test'

        def test_generate_scan2pay_qrcode():
                scan2pay = generate_scan2pay_string (
                        IBAN = 'DE014032403423',
                        beneficiary = 'GNUmed developers',
                        BIC = 'NDOLSD99X',
                        amount = '1.99',
                        invoice_id = 'GM-01-1234-x034'
                        #, comment = 'test'
                print(generate_scan2pay_qrcode(data = scan2pay))



        gmPG2.request_login_params(setup_pool = True)



def create_bill(conn=None, invoice_id=None)
Expand source code
def create_bill(conn=None, invoice_id=None):

        args = {'inv_id': invoice_id}
        cmd = """
                INSERT INTO bill.bill (invoice_id)
                VALUES (gm.nullify_empty_string(%(inv_id)s))
                RETURNING pk
        rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False)

        return cBill(aPK_obj = rows[0]['pk'])
def create_bill_item(pk_encounter=None, pk_billable=None, pk_staff=None)
Expand source code
def create_bill_item(pk_encounter=None, pk_billable=None, pk_staff=None):

        billable = cBillable(aPK_obj = pk_billable)
        cmd = """
                INSERT INTO bill.bill_item (
                ) VALUES (
                RETURNING pk"""
        args = {
                'staff': pk_staff,
                'enc': pk_encounter,
                'val': billable['raw_amount'],
                'curr': billable['currency'],
                'billable': pk_billable
        rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        return cBillItem(aPK_obj = rows[0][0])
def create_billable(code=None, term=None, data_source=None, return_existing=False)
Expand source code
def create_billable(code=None, term=None, data_source=None, return_existing=False):
        args = {
                'code': code.strip(),
                'term': term.strip(),
                'data_src': data_source
        cmd = """
                INSERT INTO ref.billable (code, term, fk_data_source)
                WHERE NOT EXISTS (
                        SELECT 1 FROM ref.billable
                                code = %(code)s
                                term = %(term)s
                                fk_data_source = %(data_src)s
                RETURNING pk"""
        rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False, return_data = True)
        if len(rows) > 0:
                return cBillable(aPK_obj = rows[0]['pk'])

        if not return_existing:
                return None

        cmd = """
                SELECT * FROM ref.v_billables
                        code = %(code)s
                        term = %(term)s
                        pk_data_source = %(data_src)s
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        return cBillable(row = {'data': rows[0], 'idx': idx, 'pk_field': 'pk_billable'})
def delete_bill(link_obj=None, pk_bill=None)
Expand source code
def delete_bill(link_obj=None, pk_bill=None):
        args = {'pk': pk_bill}
        cmd = "DELETE FROM bill.bill WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
        return True
def delete_bill_item(link_obj=None, pk_bill_item=None)
Expand source code
def delete_bill_item(link_obj=None, pk_bill_item=None):
        cmd = 'DELETE FROM bill.bill_item WHERE pk = %(pk)s AND fk_bill IS NULL'
        args = {'pk': pk_bill_item}
        gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
def delete_billable(pk_billable=None)
Expand source code
def delete_billable(pk_billable=None):
        cmd = """
                DELETE FROM ref.billable
                        pk = %(pk)s
                        NOT EXISTS (
                                SELECT 1 FROM bill.bill_item WHERE fk_billable = %(pk)s
        args = {'pk': pk_billable}
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
def generate_invoice_id(template=None, pk_patient=None, person=None, date_format='%Y-%m-%d', time_format='%H%M%S')

Generate invoice ID string, based on template.

No template given -> generate old style fixed format invoice ID.


%(pk_pat)s %(date)s %(time)s if included, $counter$ is not needed (but still possible) %(firstname)s %(lastname)s %(dob)s


    will be replaced by a counter, counting up from 1 until the invoice id is unique, max 999999
Expand source code
def generate_invoice_id(template=None, pk_patient=None, person=None, date_format='%Y-%m-%d', time_format='%H%M%S'):
        """Generate invoice ID string, based on template.

        No template given -> generate old style fixed format invoice ID.

                        if included, $counter$ is not *needed* (but still possible)

                        will be replaced by a counter, counting up from 1 until the invoice id is unique, max 999999
        assert (None in [pk_patient, person]), u'either of <pk_patient> or <person> can be defined, but not both'

        if (template is None) or (template.strip() == u''):
                template = DEFAULT_INVOICE_ID_TEMPLATE
                date_format = '%Y-%m-%d'
                time_format = '%H%M%S'
        template = template.strip()
        _log.debug('invoice ID template: %s', template)
        if pk_patient is None:
                if person is not None:
                        pk_patient = person.ID
        now = gmDateTime.pydt_now_here()
        data = {}
        data['pk_pat'] = gmTools.coalesce(pk_patient, '?')
        data['date'] = gmDateTime.pydt_strftime(now, date_format).strip()
        data['time'] = gmDateTime.pydt_strftime(now, time_format).strip()
        if person is None:
                data['firstname'] = u'?'
                data['lastname'] = u'?'
                data['dob'] = u'?'
                data['firstname'] = person['firstnames'].replace(' ', gmTools.u_space_as_open_box).strip()
                data['lastname'] = person['lastnames'].replace(' ', gmTools.u_space_as_open_box).strip()
                data['dob'] = person.get_formatted_dob (
                        format = date_format,
                        none_string = u'?',
                        honor_estimation = False
        candidate_invoice_id = template % data
        if u'#counter#' not in candidate_invoice_id:
                if u'%(time)s' in template:
                        return candidate_invoice_id

                candidate_invoice_id = candidate_invoice_id + u' [##counter#]'

        _log.debug('invoice id candidate: %s', candidate_invoice_id)
        # get existing invoice IDs consistent with candidate
        search_term = u'^\s*%s\s*$' % gmPG2.sanitize_pg_regex(expression = candidate_invoice_id).replace(u'#counter#', '\d+')
        cmd = u'SELECT invoice_id FROM bill.bill WHERE invoice_id ~* %(search_term)s UNION ALL SELECT invoice_id FROM audit.log_bill WHERE invoice_id ~* %(search_term)s'
        args = {'search_term': search_term}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) == 0:
                return candidate_invoice_id.replace(u'#counter#', u'1')

        existing_invoice_ids = [ r['invoice_id'].strip() for r in rows ]
        counter = None
        counter_max = 999999
        for idx in range(1, counter_max):
                candidate = candidate_invoice_id.replace(u'#counter#', '%s' % idx)
                if candidate not in existing_invoice_ids:
                        counter = idx
        if counter is None:
                # exhausted the range, unlikely (1 million bills are possible
                # even w/o any other invoice ID data) but technically possible
                _log.debug('exhausted uniqueness space of [%s] invoice IDs per template', counter_max)
                counter = '>%s[%s]' % (counter_max, data['time'])

        return candidate_invoice_id.replace(u'#counter#', '%s' % counter)
def generate_scan2pay_qrcode(data: str = None, create_svg: bool = False)
Expand source code
def generate_scan2pay_qrcode(data:str=None, create_svg:bool=False):
        return gmTools.create_qrcode (
                text = data,
                verbose = False,
                ecc_level = 'M',                # Wikipedia says must be M
                create_svg = create_svg
def generate_scan2pay_string(IBAN: str = None, beneficiary: str = None, BIC: str = None, amount: str | int | decimal.Decimal = '', invoice_id: str = None, comment: str = None) ‑> str

Create scan2pay data for generating a QR code.

BCD # (3) fixed, barcode tag 002 # (3) fixed, version 1 # (1) charset, 1 = utf8 SCT # (3) fixed $$ # (11) $2$,$\$::70>2$ # (70) "Empfänger" - Praxis $$ # (34) EUR$$ # (12) "EUR12.5" # (4) - leer # (35) - only this XOR the next field - GNUmed: leer $2$/Date=$\$::140$>2$ # (140) "Client:Marie Louise La Lune" - "Rg Nr, date" # (70) "pay soon :-)" - optional - GNUmed nur wenn bytes verfügbar

total: 331 bytes (not chars ! - cave UTF8) EOL: LF or CRLF last used element not followed by anything, IOW can omit pending non-used elements

Expand source code
def generate_scan2pay_string (
) -> str:
        """Create scan2pay data for generating a QR code.
        BCD                                                                                                             # (3) fixed, barcode tag
        002                                                                                                             # (3) fixed, version
        1                                                                                                               # (1) charset, 1 = utf8
        SCT                                                                                                             # (3) fixed
        $<praxis_id::BIC//Bank//%(value)s::11>$                                 # (11) <BIC>
        $2<range_of::$<current_provider_name::%(lastnames)s::>$,$<praxis::%(praxis)s::>$::70>2$                 # (70) <Name of beneficiary> "Empfänger" - Praxis
        $<praxis_id::IBAN//Bank//%(value)s::34>$                                # (34) <IBAN>
        EUR$<bill::%(total_amount_with_vat)s::12>$                              # (12) <Amount in EURO> "EUR12.5"
                                                                                                                        # (4) <purpose of transfer> - leer
                                                                                                                        # (35) <remittance info - struct> - only this XOR the next field - GNUmed: leer
        $2<range_of::InvID=$<bill::%(invoice_id)s::>$/Date=$<today::%d.%B %Y::>$::140$>2$       # (140) <remittance info - text> "Client:Marie Louise La Lune" - "Rg Nr, date"
        <beneficiary-to-payor info>                                                             # (70)  "pay soon :-)" - optional - GNUmed nur wenn bytes verfügbar
        total: 331 bytes (not chars ! - cave UTF8)
        EOL: LF or CRLF
        last *used* element not followed by anything, IOW can omit pending non-used elements
        assert IBAN, '<IBAN> must be given'
        assert beneficiary, '<beneficiary> must be given'
        assert amount, '<amount> must be given'
        assert invoice_id, '<invoice_id> must be given'

        data = {}
        data['IBAN'] = IBAN[:34]
        data['beneficiary'] = beneficiary[:70]
        if not BIC:
                BIC = ''
        data['BIC'] = BIC[:11]
        data['amount'] = str(amount)[:9]
        data['ref'] = invoice_id[:140]
        if not comment:
                comment = gmDateTime.pydt_now_here().strftime('%Y %b %d')
        data['cmt'] = comment[:70]
        data_str = 'BCD\n002\n1\nSCT\n%(BIC)s\n%(beneficiary)s\n%(IBAN)s\nEUR%(amount)s\n\n\n%(ref)s\n%(cmt)s' % data
        data_str_bytes = bytes(data_str, 'utf8')[:331]
        return str(data_str_bytes, 'utf8')
def get_bill_items(pk_patient=None, non_invoiced_only=False, return_pks=False)
Expand source code
def get_bill_items(pk_patient=None, non_invoiced_only=False, return_pks=False):
        if non_invoiced_only:
                cmd = _SQL_get_bill_item_fields % u"pk_patient = %(pat)s AND pk_bill IS NULL"
                cmd = _SQL_get_bill_item_fields % u"pk_patient = %(pat)s"
        args = {'pat': pk_patient}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        if return_pks:
                return [ r['pk_bill_item'] for r in rows ]
        return [ cBillItem(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill_item'}) for r in rows ]
def get_bill_receiver(pk_patient=None)
Expand source code
def get_bill_receiver(pk_patient=None):
def get_billables(active_only=True, order_by=None, return_pks=False)
Expand source code
def get_billables(active_only=True, order_by=None, return_pks=False):

        if order_by is None:
                order_by = ' ORDER BY catalog_long, catalog_version, billable_code'
                order_by = ' ORDER BY %s' % order_by

        if active_only:
                where = 'active IS true'
                where = 'true'

        cmd = (_SQL_get_billable_fields % where) + order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
        if return_pks:
                return [ r['pk_billable'] for r in rows ]
        return [ cBillable(row = {'data': r, 'idx': idx, 'pk_field': 'pk_billable'}) for r in rows ]
def get_bills(order_by=None, pk_patient=None, return_pks=False)
Expand source code
def get_bills(order_by=None, pk_patient=None, return_pks=False):

        args = {'pat': pk_patient}
        where_parts = ['true']

        if pk_patient is not None:
                where_parts.append('pk_patient = %(pat)s')

        if order_by is None:
                order_by = ''
                order_by = ' ORDER BY %s' % order_by

        cmd = (_SQL_get_bill_fields % ' AND '.join(where_parts)) + order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        if return_pks:
                return [ r['pk_bill'] for r in rows ]
        return [ cBill(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill'}) for r in rows ]
def get_bills4document(pk_document=None)
Expand source code
def get_bills4document(pk_document=None):
        args = {'pk_doc': pk_document}
        cmd = _SQL_get_bill_fields % 'pk_doc = %(pk_doc)s'
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
        return [ cBill(row = {'data': r, 'idx': idx, 'pk_field': 'pk_bill'}) for r in rows ]
def get_scan2pay_data(branch, bill, provider=None, comment=None)

Format data from bill, branch, and provider for scan2pay QR code generation.

Expand source code
def get_scan2pay_data(branch, bill, provider=None, comment=None):
        """Format data from bill, branch, and provider for scan2pay QR code generation."""
        assert (branch is not None), '<branch> must not be <None>'
        assert (bill is not None), '<bill> must not be <None>'

        IBANs = branch.get_external_ids(id_type = 'IBAN', issuer = 'Bank')
        if len(IBANs) == 0:
                _log.debug('no IBAN found, cannot create scan2pay data')
                return None

        IBAN = IBANs[0]['value']
        beneficiary = gmTools.coalesce (
                value2test = provider,
                return_instead = branch['praxis'][:70],
                template4value = '%%(lastnames)s, %s' % branch['praxis']
        BICs = branch.get_external_ids(id_type = 'BIC', issuer = 'Bank')
        if BICs:
                BIC = BICs[0]['value']
                BIC = ''
        amount = bill['total_amount_with_vat']
        invoice_id = (_('Inv: %s, %s') % (
                gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), '%d.%B %Y')
        return generate_scan2pay_string (
                IBAN = IBAN,
                beneficiary = beneficiary,
                BIC = BIC,
                amount = amount,
                invoice_id = invoice_id,
                comment = comment
def lock_invoice_id(invoice_id)

Lock an invoice ID.

The lock taken is an exclusive advisory lock in PostgreSQL.

Because the data is short and crc32/adler32 are fairly weak we assume that collisions can be created "easily". Therefore we apply both algorithms concurrently.

NOT compatible with anything 1.8 or below.

Expand source code
def lock_invoice_id(invoice_id):
        """Lock an invoice ID.

        The lock taken is an exclusive advisory lock in PostgreSQL.

        Because the data is short _and_ crc32/adler32 are fairly
        weak we assume that collisions can be created "easily".
        Therefore we apply both algorithms concurrently.

        NOT compatible with anything 1.8 or below.
        _log.debug('locking invoice ID: %s', invoice_id)
        token = __generate_invoice_id_lock_token(invoice_id)
        cmd = "SELECT pg_try_advisory_lock(%s)" % token
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        except gmPG2.dbapi.ProgrammingError:
                _log.exception('cannot lock invoice ID: [%s] (%s)', invoice_id, token)
                return False

        if rows[0][0]:
                return True

        _log.error('cannot lock invoice ID: [%s] (%s)', invoice_id, token)
        return False
def unlock_invoice_id(invoice_id)
Expand source code
def unlock_invoice_id(invoice_id):
        _log.debug('unlocking invoice ID: %s', invoice_id)
        token = __generate_invoice_id_lock_token(invoice_id)
        cmd = u"SELECT pg_advisory_unlock(%s)" % token
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        except gmPG2.dbapi.ProgrammingError:
                _log.exception('cannot unlock invoice ID: [%s] (%s)', invoice_id, token)
                return False

        if rows[0][0]:
                return True

        _log.error('cannot unlock invoice ID: [%s] (%s)', invoice_id, token)
        return False


class cBill (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents a bill

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)


retrieve data from backend
  • a simple value the primary key WHERE condition must be a simple column
  • a dictionary of values the primary key WHERE condition must be a subselect consuming the dict and producing the single-value primary key
must hold the fields
  • idx: a dict mapping field names to position
  • data: the field values in a list (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key field OR
  • pk_obj: a dictionary suitable for passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'idx': idx,
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
    objects = [ cChildClass(row = {'data': r, 'idx': idx, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cBill(gmBusinessDBObject.cBusinessDBObject):
        """Represents a bill"""

        _cmd_fetch_payload = _SQL_get_bill_fields % "pk_bill = %s"
        _cmds_store_payload = [
                """UPDATE bill.bill SET
                                invoice_id = gm.nullify_empty_string(%(invoice_id)s),
                                close_date = %(close_date)s,
                                apply_vat = %(apply_vat)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                fk_receiver_identity = %(pk_receiver_identity)s,
                                fk_receiver_address = %(pk_receiver_address)s,
                                fk_doc = %(pk_doc)s
                                pk = %(pk_bill)s
                                xmin = %(xmin_bill)s
                                pk as pk_bill,
                                xmin as xmin_bill
        _updatable_fields = [
        def format(self, include_receiver=True, include_doc=True):
                txt = '%s                       [#%s]\n' % (
                        gmTools.bool2subst (
                                (self._payload[self._idx['close_date']] is None),
                                _('Open bill'),
                                _('Closed bill')
                txt += _(' Invoice ID: %s\n') % self._payload[self._idx['invoice_id']]

                if self._payload[self._idx['close_date']] is not None:
                        txt += _(' Closed: %s\n') % gmDateTime.pydt_strftime (
                                '%Y %b %d',
                                accuracy = gmDateTime.acc_days

                if self._payload[self._idx['comment']] is not None:
                        txt += _(' Comment: %s\n') % self._payload[self._idx['comment']]

                txt += _(' Bill value: %(curr)s%(val)s\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'val': self._payload[self._idx['total_amount']]

                if self._payload[self._idx['apply_vat']] is None:
                        txt += _(' VAT: undecided\n')
                elif self._payload[self._idx['apply_vat']] is True:
                        txt += _(' VAT: %(perc_vat)s%% %(equals)s %(curr)s%(vat)s\n') % {
                                'perc_vat': self._payload[self._idx['percent_vat']],
                                'equals': gmTools.u_corresponds_to,
                                'curr': self._payload[self._idx['currency']],
                                'vat': self._payload[self._idx['total_vat']]
                        txt += _(' Value + VAT: %(curr)s%(val)s\n') % {
                                'curr': self._payload[self._idx['currency']],
                                'val': self._payload[self._idx['total_amount_with_vat']]
                        txt += _(' VAT: does not apply\n')

                if self._payload[self._idx['pk_bill_items']] is None:
                        txt += _(' Items billed: 0\n')
                        txt += _(' Items billed: %s\n') % len(self._payload[self._idx['pk_bill_items']])
                if include_doc:
                        txt += _(' Invoice: %s\n') % (
                                gmTools.bool2subst (
                                        self._payload[self._idx['pk_doc']] is None,
                                        _('not available'),
                                        '#%s' % self._payload[self._idx['pk_doc']]
                txt += _(' Patient: #%s\n') % self._payload[self._idx['pk_patient']]
                if include_receiver:
                        txt += gmTools.coalesce (
                                _(' Receiver: #%s\n')
                        if self._payload[self._idx['pk_receiver_address']] is not None:
                                txt += '\n '.join(gmDemographicRecord.get_patient_address(pk_patient_address = self._payload[self._idx['pk_receiver_address']]).format())

                return txt
        def add_items(self, items=None):
                """Requires no pending changes within the bill itself."""
                # should check for item consistency first
                conn = gmPG2.get_connection(readonly = False)
                for item in items:
                        item['pk_bill'] = self._payload[self._idx['pk_bill']]
               = conn)
                self.refetch_payload()          # make sure aggregates are re-filled from view
        def _get_bill_items(self):
                return [ cBillItem(aPK_obj = pk) for pk in self._payload[self._idx['pk_bill_items']] ]

        bill_items = property(_get_bill_items)
        def _get_invoice(self):
                if self._payload[self._idx['pk_doc']] is None:
                        return None
                return gmDocuments.cDocument(aPK_obj = self._payload[self._idx['pk_doc']])

        invoice = property(_get_invoice)
        def _get_address(self):
                if self._payload[self._idx['pk_receiver_address']] is None:
                        return None
                return gmDemographicRecord.get_address_from_patient_address_pk (
                        pk_patient_address = self._payload[self._idx['pk_receiver_address']]

        address = property(_get_address)
        def _get_default_address(self):
                return gmDemographicRecord.get_patient_address_by_type (
                        pk_patient = self._payload[self._idx['pk_patient']],
                        adr_type = 'billing'

        default_address = property(_get_default_address)
        def _get_home_address(self):
                return gmDemographicRecord.get_patient_address_by_type (
                        pk_patient = self._payload[self._idx['pk_patient']],
                        adr_type = 'home'

        home_address = property(_get_home_address)
        def set_missing_address_from_default(self):
                if self._payload[self._idx['pk_receiver_address']] is not None:
                        return True
                adr = self.default_address
                if adr is None:
                        adr = self.home_address
                        if adr is None:
                                return False
                self['pk_receiver_address'] = adr['pk_lnk_person_org_address']
                return self.save_payload()


Instance variables

var address
Expand source code
def _get_address(self):
        if self._payload[self._idx['pk_receiver_address']] is None:
                return None
        return gmDemographicRecord.get_address_from_patient_address_pk (
                pk_patient_address = self._payload[self._idx['pk_receiver_address']]
var bill_items
Expand source code
def _get_bill_items(self):
        return [ cBillItem(aPK_obj = pk) for pk in self._payload[self._idx['pk_bill_items']] ]
var default_address
Expand source code
def _get_default_address(self):
        return gmDemographicRecord.get_patient_address_by_type (
                pk_patient = self._payload[self._idx['pk_patient']],
                adr_type = 'billing'
var home_address
Expand source code
def _get_home_address(self):
        return gmDemographicRecord.get_patient_address_by_type (
                pk_patient = self._payload[self._idx['pk_patient']],
                adr_type = 'home'
var invoice
Expand source code
def _get_invoice(self):
        if self._payload[self._idx['pk_doc']] is None:
                return None
        return gmDocuments.cDocument(aPK_obj = self._payload[self._idx['pk_doc']])


def add_items(self, items=None)

Requires no pending changes within the bill itself.

Expand source code
def add_items(self, items=None):
        """Requires no pending changes within the bill itself."""
        # should check for item consistency first
        conn = gmPG2.get_connection(readonly = False)
        for item in items:
                item['pk_bill'] = self._payload[self._idx['pk_bill']]
       = conn)
        self.refetch_payload()          # make sure aggregates are re-filled from view
def set_missing_address_from_default(self)
Expand source code
def set_missing_address_from_default(self):
        if self._payload[self._idx['pk_receiver_address']] is not None:
                return True
        adr = self.default_address
        if adr is None:
                adr = self.home_address
                if adr is None:
                        return False
        self['pk_receiver_address'] = adr['pk_lnk_person_org_address']
        return self.save_payload()

Inherited members

class cBillItem (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents business objects in the database.


  • instances ARE ASSUMED TO EXIST in the database
  • PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
  • Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
  • does NOT verify FK target existence
  • does NOT create new entries in the database
  • does NOT lazy-fetch fields on access

Class scope SQL commands and variables:


  • must return exactly one row
  • WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
  • must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables


  • one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
  • the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
  • the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
  • when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)


  • a list of fields available for update via object['field']

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)


retrieve data from backend
  • a simple value the primary key WHERE condition must be a simple column
  • a dictionary of values the primary key WHERE condition must be a subselect consuming the dict and producing the single-value primary key
must hold the fields
  • idx: a dict mapping field names to position
  • data: the field values in a list (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key field OR
  • pk_obj: a dictionary suitable for passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'idx': idx,
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
    objects = [ cChildClass(row = {'data': r, 'idx': idx, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cBillItem(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_bill_item_fields % u"pk_bill_item = %s"
        _cmds_store_payload = [
                """UPDATE bill.bill_item SET
                                fk_provider = %(pk_provider)s,
                                fk_encounter = %(pk_encounter_to_bill)s,
                                date_to_bill = %(raw_date_to_bill)s,
                                description = gm.nullify_empty_string(%(item_detail)s),
                                net_amount_per_unit = %(net_amount_per_unit)s,
                                currency = gm.nullify_empty_string(%(currency)s),
                                fk_bill = %(pk_bill)s,
                                unit_count = %(unit_count)s,
                                amount_multiplier = %(amount_multiplier)s
                                pk = %(pk_bill_item)s
                                xmin = %(xmin_bill_item)s
                                xmin AS xmin_bill_item

        _updatable_fields = [
        def format(self):
                txt = '%s (%s %s%s)         [#%s]\n' % (
                                self._payload[self._idx['pk_bill']] is None,
                                _('Open item'),
                                _('Billed item'),
                        gmTools.coalesce(self._payload[self._idx['catalog_language']], '', ' - %s'),
                txt += ' %s: %s\n' % (
                txt += gmTools.coalesce (
                        '  (%s)\n',
                txt += gmTools.coalesce (
                        _(' Details: %s\n'),

                txt += '\n'
                txt += _(' %s of units: %s\n') % (
                txt += _(' Amount per unit: %(curr)s%(val_p_unit)s (%(cat_curr)s%(cat_val)s per catalog)\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'val_p_unit': self._payload[self._idx['net_amount_per_unit']],
                        'cat_curr': self._payload[self._idx['billable_currency']],
                        'cat_val': self._payload[self._idx['billable_amount']]
                txt += _(' Amount multiplier: %s\n') % self._payload[self._idx['amount_multiplier']]
                txt += _(' VAT would be: %(perc_vat)s%% %(equals)s %(curr)s%(vat)s\n') % {
                        'perc_vat': self._payload[self._idx['vat_multiplier']] * 100,
                        'equals': gmTools.u_corresponds_to,
                        'curr': self._payload[self._idx['currency']],
                        'vat': self._payload[self._idx['vat']]

                txt += '\n'
                txt += _(' Charge date: %s') % gmDateTime.pydt_strftime (
                        '%Y %b %d',
                        accuracy = gmDateTime.acc_days
                bill = self.bill
                if bill is not None:
                        txt += _('\n On bill: %s') % bill['invoice_id']

                return txt
        def _get_billable(self):
                return cBillable(aPK_obj = self._payload[self._idx['pk_billable']])

        billable = property(_get_billable)
        def _get_bill(self):
                if self._payload[self._idx['pk_bill']] is None:
                        return None
                return cBill(aPK_obj = self._payload[self._idx['pk_bill']])

        bill = property(_get_bill)
        def _get_is_in_use(self):
                return self._payload[self._idx['pk_bill']] is not None

        is_in_use = property(_get_is_in_use)


Instance variables

var bill
Expand source code
def _get_bill(self):
        if self._payload[self._idx['pk_bill']] is None:
                return None
        return cBill(aPK_obj = self._payload[self._idx['pk_bill']])
var billable
Expand source code
def _get_billable(self):
        return cBillable(aPK_obj = self._payload[self._idx['pk_billable']])
var is_in_use
Expand source code
def _get_is_in_use(self):
        return self._payload[self._idx['pk_bill']] is not None

Inherited members

class cBillable (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Items which can be billed to patients.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)


retrieve data from backend
  • a simple value the primary key WHERE condition must be a simple column
  • a dictionary of values the primary key WHERE condition must be a subselect consuming the dict and producing the single-value primary key
must hold the fields
  • idx: a dict mapping field names to position
  • data: the field values in a list (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key field OR
  • pk_obj: a dictionary suitable for passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'idx': idx,
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
    objects = [ cChildClass(row = {'data': r, 'idx': idx, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cBillable(gmBusinessDBObject.cBusinessDBObject):
        """Items which can be billed to patients."""

        _cmd_fetch_payload = _SQL_get_billable_fields % "pk_billable = %s"
        _cmds_store_payload = [
                """UPDATE ref.billable SET
                                fk_data_source = %(pk_data_source)s,
                                code = %(billable_code)s,
                                term = %(billable_description)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                amount = %(raw_amount)s,
                                currency = %(currency)s,
                                vat_multiplier = %(vat_multiplier)s,
                                active = %(active)s
                                --, discountable = %(discountable)s
                                pk = %(pk_billable)s
                                xmin = %(xmin_billable)s
                                xmin AS xmin_billable

        _updatable_fields = [
        def format(self):
                txt = '%s                                    [#%s]\n\n' % (
                        gmTools.bool2subst (
                                _('Active billable item'),
                                _('Inactive billable item')
                txt += ' %s: %s\n' % (
                txt += _(' %(curr)s%(raw_val)s + %(perc_vat)s%% VAT = %(curr)s%(val_w_vat)s\n') % {
                        'curr': self._payload[self._idx['currency']],
                        'raw_val': self._payload[self._idx['raw_amount']],
                        'perc_vat': self._payload[self._idx['vat_multiplier']] * 100,
                        'val_w_vat': self._payload[self._idx['amount_with_vat']]
                txt += ' %s %s%s (%s)' % (
                        gmTools.coalesce(self._payload[self._idx['catalog_language']], '', ' - %s'),
                txt += gmTools.coalesce(self._payload[self._idx['comment']], '', '\n %s')

                return txt
        def _get_is_in_use(self):
                cmd = 'SELECT EXISTS(SELECT 1 FROM bill.bill_item WHERE fk_billable = %(pk)s LIMIT 1)'
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'pk': self._payload[self._idx['pk_billable']]}}])
                return rows[0][0]

        is_in_use = property(_get_is_in_use)


Instance variables

var is_in_use
Expand source code
def _get_is_in_use(self):
        cmd = 'SELECT EXISTS(SELECT 1 FROM bill.bill_item WHERE fk_billable = %(pk)s LIMIT 1)'
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'pk': self._payload[self._idx['pk_billable']]}}])
        return rows[0][0]

Inherited members