Source code for core.helpers

"""Module containing core app helper functions."""

import re

from algosdk.constants import ADDRESS_LEN
from django.conf import settings
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.http import Http404

from api.client import export_status
from api.client import reset_export as _reset_export
from api.client import start_export as _start_export
from core.models import BundleName
from core.templatetags.core_extras import short_address
from utils.charts import prepare_consolidated_charts
from utils.constants.core import (
    BUNDLE_SEPARATION_CHARS,
    FORBIDDEN_ADDRESSES,
    INVALID_ADDRESS_TEXT,
    MAX_BUNDLE_SIZE,
)
from utils.constants.tax import TAX_DURATION_MESSAGE
from utils.constants.users import (
    BUNDLE_ADDRESSES_LIMIT_HELP_TEXT,
    PUBLIC_BUNDLE_ADDRESSES_LIMIT_HELP_TEXT,
    PUBLIC_BUNDLE_ADDRESSES_NOT_ALLOWED_HELP_TEXT,
)
from utils.helpers import (
    check_algorand_address,
    check_bundle_addresses,
    message_for_app_code_in_values,
)


## RAW PARSING
def _normalize_collection(collection, max_bundle_size=MAX_BUNDLE_SIZE):
    """Return collection of Algorand addresses from provided collection.

    An item in collection can either be an Algorand address, empty string
    or string of addresses separated by spaces.

    :param collection:  address or addresses separated by spaces collection
    :type collection: generator
    :param max_bundle_size: maximum allowed addresses count in bundle
    :type max_bundle_size: int
    :return: list
    """
    bundle = []
    for entry in collection:
        if " " in entry:
            bundle.extend(entry.split(" "))
        elif entry:
            bundle.append(entry)
    return bundle[:max_bundle_size]


def _parse_bundle(data):
    """Return collection of parsed addresses and/or nameservices' entries.

    :param data: multiple entries data from bundle field
    :type data: str
    :return: generator
    """
    return (
        check_algorand_address(_strip_address(address))
        for address in re.split("|".join(BUNDLE_SEPARATION_CHARS) + "|\n", data)
    )


def _strip_address(address):
    """Strip and return address without any separation characters.

    :param address: address including bundle separation character
    :type address: str
    :return: str
    """
    return address.strip().strip("".join(BUNDLE_SEPARATION_CHARS))


[docs] def addresses_from_raw(raw, address_data="", max_bundle_size=MAX_BUNDLE_SIZE): """Return address or multiple addresses separated by spaces. Raise ValidationError if raw content isn't a valid Algorand address. :var raw: multiple entries data from bundle field :type raw: str :var address_data: data from address field :type address_data: str :param max_bundle_size: maximum allowed addresses count in bundle :type max_bundle_size: int :var bundle: parsed collection of addresses :type bundle: list :return: str """ if not raw: if not address_data: raise ValidationError(INVALID_ADDRESS_TEXT) if len(address_data) == ADDRESS_LEN: return check_algorand_address(address_data, raise_error=True) raw = address_data bundle = _normalize_collection(_parse_bundle(raw), max_bundle_size=max_bundle_size) if bundle: return " ".join(address for address in set(bundle) if address) raise ValidationError(INVALID_ADDRESS_TEXT)
[docs] def check_forbidden_addresses(value): """Raiose 404 error if value contains any forbidden address. :param value: space delimited addresses :type value: str """ if any(address in value for address in FORBIDDEN_ADDRESSES): raise Http404
[docs] def context_with_consolidated_data(context, serialized_data): """Update provided `context` with consolidated data created from `serialized_data`. :param context: object containing all the data needed for rendering :type context: dict :param serialized_data: serialized account's data :type serialized_data: dict :var context["distchart"]: data for rendering top ASA distribution chart :type context["distchart"]: dict :var context["ratiochart"]: data for rendering ALGO/ASA/NFT chart :type context["ratiochart"]: dict :var context["nftfloorchart"]: data for rendering NFT floors chart :type context["nftfloorchart"]: dict :var context["consolidated"]: consolidated view totals :type context["consolidated"]: :class:`utils.structs.Consolidated` :return: dict """ ( context["distchart"], context["ratiochart"], context["nftfloorchart"], context["consolidated"], ) = prepare_consolidated_charts( serialized_data, context["asas"], context["values"], context["total"], context["nft_colors"], ) return context
# # TAX CLIENT-SIDE def _send_duration_messages(request): """Create and emit Django messages about varying CSV files creation duration. :param request: Django request object :type request: :class:`django.http.HttpRequest` :var paragraph: currenty processed paragraph of text :type paragraph: str """ for paragraph in TAX_DURATION_MESSAGE.split("\n"): messages.success(request, "-" * 5) messages.success(request, paragraph) def _send_enqueued_messages(request, addresses): """Create and emit Django message for each address in `addresses` collection. :param request: Django request object :type request: :class:`django.http.HttpRequest` :param addresses: public Algorand addresses separated by spaces :type addresses: str :var address: currently processed public Algorand address :type address: str :var shorten: current address' shortened representation :type shorten: str """ for address in addresses.split(" "): shorten = short_address(address) messages.success(request, f"{shorten} enqueued for processing!")
[docs] def check_export_status(url_value): """Return tax data from cache for provided `url_value`. :param url_value: address or bundle value found in url :type url_value: str :return: dict """ return export_status(url_value)
[docs] def prepare_tax_context(context, url_value): """Prepare tax context object for provided `url_value`. :param context: object containing all the data needed for rendering :type context: dict :param url_value: address or bundle value found in url :type url_value: str :var typ: type of entry (address or bundle) :type typ: str :var addresses: address(es) value found in url :type addresses: str :return: dict """ typ, addresses = ( ("address", url_value) if len(url_value) > 50 else ("bundle", check_bundle_addresses(url_value)) ) context[typ] = addresses.split(" ") context["url_value"] = url_value return context
[docs] def reset_export(url_value): """Reset cache data for provided `url_value` and call zip deletion consumer task. :param url_value: address or bundle value found in url :type url_value: str """ return _reset_export(url_value)
[docs] def start_export(url_value, addresses, request, **kwargs): """Start addresss processing task and record it. :param url_value: address or bundle value found in url :type url_value: str :param addresses: public Algorand addresses separated by spaces :type addresses: str :param request: Django request object :type request: :class:`django.http.HttpRequest` """ _send_enqueued_messages(request, addresses) # keep the open UX messaging return _start_export(url_value, addresses) # backend enqueues process_addresses
# # PUBLIC BUNDLES def _check_public_bundles_eligibility(): """Return collection of all eligible public bundle instances. :var bundlenames: profile's bundle names collection :type bundlenames: :class:`QuerySet` :return: list """ bundlenames = BundleName.objects.filter(public=True) return [ bundlename for bundlename in bundlenames if bundlename.is_eligible_public_bundlename() ] def _create_public_bundle_files(hashes): """Create files from provided collection of hashes/filenames. :param hashes: collection of bundle names and related bundle hashes :type hashes: list :var bundlenames_path: full path to public bundles directory :type bundlenames_path: :class:`pathlib.PosixPath` :var bundle_hash: currently processed bundle hash :type bundle_hash: str """ bundlenames_path = settings.DATA_PATH / "bundles" for bundle_hash in hashes: (bundlenames_path / bundle_hash).touch() def _delete_public_bundle_files(hashes): """Delete all files from provided collection of hashes/filenames. :param hashes: collection of bundle names and related bundle hashes :type hashes: list :var bundlenames_path: full path to public bundles directory :type bundlenames_path: :class:`pathlib.PosixPath` :var bundle_hash: currently processed bundle hash :type bundle_hash: str """ bundlenames_path = settings.DATA_PATH / "bundles" for bundle_hash in hashes: (bundlenames_path / bundle_hash).unlink() def _public_bundle_filenames(): """Return collection of all existing filenames/hashes found on disk. :var bundlenames_path: full path to public bundles directory :type bundlenames_path: :class:`pathlib.PosixPath` :return: set """ bundlenames_path = settings.DATA_PATH / "bundles" return { filename.stem for filename in bundlenames_path.glob("*") if "_" in filename.stem } def _public_bundles_collection(): """Return collection of all public bundle hashes and instances. :return: list """ return [ (f"{bundlename.name.lower()}_{bundlename.bundle}", bundlename) for bundlename in _check_public_bundles_eligibility() ]
[docs] def check_public_bundles(): """Check and add new and delete obsolete public bundle name filenames. Return collection of updated bundle names. :param bundlenames: collection of all instances of public bundle names :type bundlenames: list :var hashes: collection of all public bundle name hashes :type hashes: set :var filenames: collection of all existing filenames/hashes found on disk :type filenames: set :var missing: collection of public bundle name hashes not found on disk :type missing: set :var obsolete: collection of filenames that aren't valid public bundle names :type obsolete: set :return: list """ bundlenames = _public_bundles_collection() hashes = {bundlename[0] for bundlename in bundlenames} filenames = _public_bundle_filenames() missing = hashes - filenames obsolete = filenames - hashes if missing: _create_public_bundle_files(missing) if obsolete: _delete_public_bundle_files(obsolete) return [ "_".join(bundle_hash.split("_")[:-1]) for bundle_hash in missing.union(obsolete) ]
[docs] def format_addresses_limit_help_text(bundlename): """Return formatted help text for provided bundle instance. :param bundlename: bundle name instance :type bundlename: :class:`core.models.BundleName` :var help_text: addresses field's help text :type help_text: str :var limit_public: number of addresses allowed in a public bundle :type limit_public: int :var public_help_text: addresses field's help text related to public bundle :type public_help_text: str """ help_text = BUNDLE_ADDRESSES_LIMIT_HELP_TEXT.format( bundlename.profile.bundle_size_limit(bundlename) ) limit_public = bundlename.profile.bundle_size_limit_for_public() public_help_text = ( PUBLIC_BUNDLE_ADDRESSES_LIMIT_HELP_TEXT.format(limit_public) if limit_public else PUBLIC_BUNDLE_ADDRESSES_NOT_ALLOWED_HELP_TEXT ) return help_text + public_help_text