# SPDX-License-Identifier: GPL-3.0-or-later
#
# Copyright (C) 2025 The Project Authors
# See pyproject.toml for authors/maintainers.
# See LICENSE for license details.
"""
{Short module description (1-3 sentences)}
todo docstring
Features
--------
todo docstring
* {feature 1}
* {feature 2}
* {feature 3}
* {etc}
Overview
--------
todo docstring
{Overview description}
Examples
--------
todo docstring
{Examples in rST}
Print a message
.. code-block:: python
# print message
print("Hello world!")
# [Output] >> 'Hello world!'
"""
# IMPORTS
# ***********************************************************************
# import modules from other libs
# Native imports
# =======================================================================
import os
import xml.etree.ElementTree as ET
# ... {develop}
# External imports
# =======================================================================
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
# ... {develop}
# Project-level imports
# =======================================================================
from babilonia.root import *
# ... {develop}
# CONSTANTS
# ***********************************************************************
# define constants in uppercase
# CONSTANTS -- Project-level
# =======================================================================
# Portaria Interministerial MPS/MF nº 6
TABELA_INSS_2025 = [(1518.00, 0.075), (2793.88, 0.09), (4190.83, 0.12), (8157.41, 0.14)]
# MEDIDA PROVISÓRIA Nº 1.294, DE 11 DE ABRIL DE 2025
TABELA_IRRF_2025 = [
(2428.81, 2826.65, 0.075, 182.16),
(2826.66, 3751.05, 0.15, 394.16),
(3751.06, 4664.68, 0.225, 675.49),
(4664.68, float("inf"), 0.275, 908.73),
]
# CONSTANTS -- Module-level
# =======================================================================
# ... {develop}
# FUNCTIONS
# ***********************************************************************
# FUNCTIONS -- Project-level
# =======================================================================
# ... {develop}
# FUNCTIONS -- Module-level
# =======================================================================
# ... {develop}
# CLASSES
# ***********************************************************************
# CLASSES -- Project-level
# =======================================================================
[docs]
class Budget(RecordTable):
def __init__(self, name="MyBudget", alias="Bud"):
super().__init__(name=name, alias=alias)
# ------------- specifics attributes ------------- #
self.total_revenue = None
self.total_expenses = None
self.total_net = None
self.summary_ascend = False
def _set_fields(self):
# ------------ call super ----------- #
super()._set_fields()
# set temporary util fields
self.sign_field = "Sign"
self.value_signed = "Value_Signed"
# ... continues in downstream objects ... #
def _set_data_columns(self):
# Main data columns
self.columns_data_main = [
"Type",
"Status",
"Contract",
"Name",
"Value",
]
# Extra data columns
self.columns_data_extra = [
# Status extra
"Date_Due",
"Date_Exe",
# Name extra
# tags
"Tags",
# Values extra
# Payment details
"Method",
"Protocol",
]
# File columns
self.columns_data_files = [
"File_Receipt",
"File_Invoice",
"File_NF",
]
# concat all lists
self.columns_data = (
self.columns_data_main + self.columns_data_extra + self.columns_data_files
)
# variations
self.columns_data_status = self.columns_data_main + [
self.columns_data_extra[0],
self.columns_data_extra[1],
]
# ... continues in downstream objects ... #
def _set_operator(self):
# ------------- define sub routines here ------------- #
def func_file_status():
return FileSys.check_file_status(files=self.data["File"].values)
def func_update_status():
# filter relevante data
df = self.data[["Status", "Method", "Date_Due"]].copy()
# Convert 'Date_Due' to datetime format
df["Date_Due"] = pd.to_datetime(self.data["Date_Due"])
# Get the current date
current_dt = datetime.datetime.now()
# Update 'Status' for records with 'Automatic' method and 'Expected' status based on the condition
condition = (
(df["Method"] == "Automatic")
& (df["Status"] == "Expected")
& (df["Date_Due"] <= current_dt)
)
df.loc[condition, "Status"] = "Executed"
# return values
return df["Status"].values
# todo implement all operations
# ---------------- the operator ---------------- #
self.operator = {
"Status": func_update_status,
}
def _get_total_expenses(self, filter_df=True):
filtered_df = self._filter_prospected_cancelled() if filter_df else self.data
_n = filtered_df[filtered_df["Type"] == "Expense"]["Value_Signed"].sum()
return round(_n, 3)
def _get_total_revenue(self, filter_df=True):
filtered_df = self._filter_prospected_cancelled() if filter_df else self.data
_n = filtered_df[filtered_df["Type"] == "Revenue"]["Value_Signed"].sum()
return round(_n, 3)
def _filter_prospected_cancelled(self):
return self.data[
(self.data["Status"] != "Prospected") & (self.data["Status"] != "Cancelled")
]
[docs]
def update(self):
super().update()
if self.data is not None:
self.total_revenue = self._get_total_revenue(filter_df=True)
self.total_expenses = self._get_total_expenses(filter_df=True)
self.total_net = self.total_revenue + self.total_expenses
if self.total_net > 0:
self.summary_ascend = False
else:
self.summary_ascend = True
# ... continues in downstream objects ... #
return None
[docs]
def set_data(self, input_df):
"""
Set RecordTable data from incoming dataframe.
Expected to be incremented downstream.
:param input_df: incoming dataframe
:type input_df: dataframe
:return: None
:rtype: None
"""
super().set_data(input_df=input_df)
# convert to numeric
self.data["Value"] = pd.to_numeric(self.data["Value"])
# compute temporary field
# sign and value_signed
self.data["Sign"] = self.data["Type"].apply(
lambda x: 1 if x == "Revenue" else -1
)
self.data["Value_Signed"] = self.data["Sign"] * self.data["Value"]
[docs]
def get_summary_by_type(self):
summary = pd.DataFrame(
{
"Total_Expenses": [self.total_expenses],
"Total_Revenue": [self.total_revenue],
"Total_Net": [self.total_net],
}
)
summary = summary.apply(
lambda x: x.sort_values(ascending=self.summary_ascend), axis=1
)
return summary
[docs]
def get_summary_by_status(self, filter_df=True):
filtered_df = self._filter_prospected_cancelled() if filter_df else self.data
return (
filtered_df.groupby("Status")["Value_Signed"]
.sum()
.sort_values(ascending=self.summary_ascend)
)
[docs]
def get_summary_by_contract(self, filter_df=True):
filtered_df = self._filter_prospected_cancelled() if filter_df else self.data
return (
filtered_df.groupby("Contract")["Value_Signed"]
.sum()
.sort_values(ascending=self.summary_ascend)
)
[docs]
@staticmethod
def parse_annual_budget(year, budget_df, freq_field="Freq"):
start_date = "{}-01-01".format(year)
end_date = "{}-01-01".format(int(year) + 1)
annual_budget = pd.DataFrame()
for _, row in budget_df.iterrows():
# Generate date range based on frequency
dates = pd.date_range(start=start_date, end=end_date, freq=row["Freq"])
# Replicate the row for each date
replicated_data = pd.DataFrame(
{col: [row[col]] * len(dates) for col in df.columns}
)
replicated_data["Date_Due"] = dates
# Append to the expanded budget
annual_budget = pd.concat(
[annual_budget, replicated_data], ignore_index=True
)
return annual_budget
[docs]
class CashFlow(DataSet):
"""
A primitive class for handling Cash flow analysis
.. dropdown:: Cashflow Analysis Example
:icon: code-square
:open:
.. code-block:: python
from babilonia.accounting import CashFlow
# create an empty class
cf = CashFlow()
# set the file for CSV
file_csv = "path/to/file.csv" # [change this]
# load data
cf.load_data(file_csv)
# call method
dc = cf.cashflow_analysis(df=cf.data, category="Custeio")
# print data
print(dc["monthly"])
print(dc["yearly"])
"""
def __init__(self, name="CashFlow", alias="CF"):
super().__init__(name=name, alias=alias)
[docs]
def load_data(self, file_data):
# overwrite relative path inputs
# ----------------------------------------------
self.file_data = os.path.abspath(file_data)
# implement loading logic
# ----------------------------------------------
df = pd.read_csv(
self.file_data,
sep=self.file_csv_sep,
encoding=self.file_encoding,
dtype=str,
)
df = df[["Data", "Categoria", "Valor", "Descricao"]].copy()
# make conversions
df["Data"] = pd.to_datetime(df["Data"])
df["Valor"] = df["Valor"].astype(float)
# post-loading logic
# ----------------------------------------------
self.data = df.copy()
# update other mutables
# ----------------------------------------------
self.update()
# ... continues in downstream objects ... #
[docs]
@staticmethod
def get_cashflow_analysis(df, category=None):
"""
Perform cash flow analysis with monthly and yearly aggregation.
This method classifies cash flows into inputs and outputs, aggregates
values on a monthly and yearly basis, and computes cumulative balances.
The analysis is fully independent from class state and inheritance
behavior.
:param df:
Input cash flow data containing at least the columns
``Data``, ``Categoria`` and ``Valor``.
:type df: pandas.DataFrame
:param category:
Optional category filter. If ``None``, all categories are grouped
under ``"Geral"``.
:type category: str or None
:returns:
Dictionary with monthly and yearly cash flow summaries.
:rtype: dict
"""
df = CashFlow.enrich_time_index(df)
df = CashFlow.classify_flows(df)
df, category = CashFlow.filter_category(df, category)
df_monthly = CashFlow.get_monthly_summary(df, category)
df_monthly = CashFlow.compute_oir(df_monthly)
df_yearly = CashFlow.get_yearly_summary(df_monthly, category)
df_yearly = CashFlow.compute_oir(df_yearly)
return {
"monthly": df_monthly.round(decimals=2),
"yearly": df_yearly.round(decimals=2),
}
[docs]
@staticmethod
def compute_oir(df, input_field="Entradas", output_field="Saidas"):
df["OIR"] = df[output_field].abs() / df[input_field].abs()
df["OIR"] = df["OIR"].fillna(100)
return df
[docs]
@staticmethod
def enrich_time_index(df):
"""
Add year and year-month time indices to the cash flow data.
This method extracts the calendar year and a ``YYYY-MM`` monthly
identifier from the ``Data`` column.
:param df:
Input cash flow data.
:type df: pandas.DataFrame
:returns:
Copy of the input data with additional ``Ano`` and ``Mes`` columns.
:rtype: pandas.DataFrame
"""
df = df.copy()
df["Ano"] = df["Data"].dt.year
df["Mes"] = df["Data"].dt.strftime("%Y-%m")
return df
[docs]
@staticmethod
def classify_flows(df):
"""
Classify cash flows as inputs or outputs.
Positive or zero values are classified as ``"In"`` and negative values
as ``"Out"``.
:param df:
Cash flow data containing a ``Valor`` column.
:type df: pandas.DataFrame
:returns:
Copy of the input data with an additional ``Flow`` column.
:rtype: pandas.DataFrame
"""
df = df.copy()
df["Flow"] = np.where(df["Valor"] >= 0, "In", "Out")
return df
[docs]
@staticmethod
def filter_category(df, category):
"""
Filter cash flow data by category.
If no category is provided, all records are grouped under the
default category ``"Geral"``.
:param df:
Cash flow data.
:type df: pandas.DataFrame
:param category:
Category name used to filter the data.
:type category: str or None
:returns:
Tuple containing the filtered data and the resolved category name.
:rtype: tuple
"""
if category is None:
return df.copy(), "Geral"
return df.query("Categoria == @category").copy(), category
[docs]
@staticmethod
def get_monthly_summary(df, category):
"""
Compute monthly cash flow summaries for each year.
This method aggregates cash flow inputs and outputs on a monthly basis,
ensures that all calendar months are present, and computes annual
cumulative balances.
:param df:
Cash flow data enriched with time indices and flow classification.
:type df: pandas.DataFrame
:param category:
Category name associated with the analysis.
:type category: str
:returns:
Monthly cash flow summary table.
:rtype: pandas.DataFrame
"""
years = range(df["Ano"].min(), df["Ano"].max() + 1)
monthly_frames = []
for year in years:
calendar = pd.DataFrame(
{
"Ano": str(year),
"Mes": [f"{year}-{str(m).zfill(2)}" for m in range(1, 13)],
"Categoria": category,
}
)
df_year = df.query("Ano == @year")
inp = (
df_year.query("Flow == 'In'")
.groupby("Mes")["Valor"]
.agg(Entradas="sum", Entradas_N="count")
.reset_index()
)
out = (
df_year.query("Flow == 'Out'")
.groupby("Mes")["Valor"]
.agg(Saidas="sum", Saidas_N="count")
.reset_index()
)
df_year = (
calendar.merge(inp, on="Mes", how="left")
.merge(out, on="Mes", how="left")
.fillna(0)
)
for col in ["Entradas_N", "Saidas_N"]:
df_year[col] = df_year[col].astype(int)
# Net flow
df_year["Fluxo"] = df_year["Entradas"] + df_year["Saidas"]
df_year["Entradas_Acum"] = df_year["Entradas"].cumsum()
df_year["Saidas_Acum"] = df_year["Saidas"].cumsum()
df_year["Fluxo_Acum"] = df_year["Fluxo"].cumsum()
monthly_frames.append(df_year)
return pd.concat(monthly_frames, ignore_index=True)
[docs]
@staticmethod
def get_yearly_summary(df_monthly, category):
"""
Compute yearly cash flow summaries.
This method aggregates monthly cash flow data into yearly totals and
computes cumulative balances across years.
:param df_monthly:
Monthly cash flow summary table.
:type df_monthly: pandas.DataFrame
:param category:
Category name associated with the analysis.
:type category: str
:returns:
Yearly cash flow summary table.
:rtype: pandas.DataFrame
"""
df = (
df_monthly.groupby("Ano")
.agg(
Entradas=("Entradas", "sum"),
Entradas_N=("Entradas_N", "sum"),
Saidas=("Saidas", "sum"),
Saidas_N=("Saidas_N", "sum"),
Fluxo=("Fluxo", "sum"),
)
.reset_index()
)
df["Categoria"] = category
df["Entradas_Acum"] = df["Entradas"].cumsum()
df["Saidas_Acum"] = df["Saidas"].cumsum()
df["Fluxo_Acum"] = df["Fluxo"].cumsum()
return df[
[
"Ano",
"Categoria",
"Entradas",
"Entradas_N",
"Saidas",
"Saidas_N",
"Fluxo",
"Entradas_Acum",
"Saidas_Acum",
"Fluxo_Acum",
]
]
[docs]
@staticmethod
def get_cashflow_report(df, year=None, initial_cash=None):
"""
Build a yearly cash flow panel and summary by category.
:param df:
Cash flow DataFrame containing transactional data with date,
category, and value information compatible with the
``enrich_time_index``, ``classify_flows``, and
``cashflow_analysis`` methods.
:type df: pandas.DataFrame
:param year:
Year to be analyzed. If ``None``, the current calendar year
(local time) is used.
:type year: int, optional
:param initial_cash:
Initial account balance at the beginning of the selected year.
If ``None``, defaults to ``0.0``.
:type initial_cash: float, optional
:return:
Dictionary containing: ``"Pannel"``: monthly cash flow panel with totals, per-category
flows, and running balance. ``"Summary"``: yearly summary by category with total, mean, and
percentage contribution to total inflows.
:rtype: dict
"""
# ------------------------------------------------------------------
# Handle defaults
# ------------------------------------------------------------------
if year is None:
from datetime import datetime
year = datetime.now().year
if initial_cash is None:
initial_cash = 0.0
# ------------------------------------------------------------------
# Prepare and filter data
# ------------------------------------------------------------------
df = CashFlow.enrich_time_index(df)
df = df.query(f"Ano == {year}").copy()
# Classify flows as inflow / outflow
df = CashFlow.classify_flows(df)
# Separate inflow and outflow categories
has_category = "Categoria" in df.columns and df["Categoria"].notna().any()
if has_category:
df_inp = df.query("Flow == 'In'")
df_out = df.query("Flow == 'Out'")
ls_categories_inp = df_inp["Categoria"].dropna().unique()
ls_categories_out = df_out["Categoria"].dropna().unique()
else:
ls_categories_inp = []
ls_categories_out = []
# ------------------------------------------------------------------
# Base monthly panel (all categories aggregated)
# ------------------------------------------------------------------
dc_cfa = CashFlow.get_cashflow_analysis(df, category=None)
df_cfa = dc_cfa["monthly"][["Ano", "Mes", "Fluxo", "Entradas", "Saidas"]].copy()
# ------------------------------------------------------------------
# Add inflow categories (monthly)
# ------------------------------------------------------------------
for cat in ls_categories_inp:
dc_cat = CashFlow.get_cashflow_analysis(df, category=cat)
df_cat = dc_cat["monthly"][["Mes", "Entradas"]].copy()
df_cat.rename(columns={"Entradas": cat}, inplace=True)
df_cfa = pd.merge(df_cfa, df_cat, how="left", on="Mes")
# ------------------------------------------------------------------
# Add outflow categories (monthly)
# ------------------------------------------------------------------
for cat in ls_categories_out:
dc_cat = CashFlow.get_cashflow_analysis(df, category=cat)
df_cat = dc_cat["monthly"][["Mes", "Saidas"]].copy()
df_cat.rename(columns={"Saidas": cat}, inplace=True)
df_cfa = pd.merge(df_cfa, df_cat, how="left", on="Mes")
# ------------------------------------------------------------------
# Compute running balance
# ------------------------------------------------------------------
df_cfa["Saldo"] = initial_cash + df_cfa["Fluxo"].cumsum()
# ------------------------------------------------------------------
# Build yearly summary by category
# ------------------------------------------------------------------
total_entradas = df_cfa["Entradas"].sum()
total_saidas = df_cfa["Saidas"].sum()
media_entradas = df_cfa["Entradas"].mean()
media_saidas = df_cfa["Saidas"].mean()
rows_summary = [
{
"Ano": year,
"Categoria": "ENTRADAS",
"Total": total_entradas,
"Media": media_entradas,
"% Entradas": 100.0,
},
{
"Ano": year,
"Categoria": "SAIDAS",
"Total": total_saidas,
"Media": media_saidas,
"% Entradas": (
abs(total_saidas) / total_entradas * 100
if total_entradas != 0
else 0.0
),
},
]
ls_categories = list(ls_categories_inp) + list(ls_categories_out)
ls_categories = list(ls_categories_inp) + list(ls_categories_out)
if ls_categories:
totals = df_cfa[ls_categories].sum()
averages = df_cfa[ls_categories].mean()
pct_entradas = (
totals.abs() / total_entradas * 100 if total_entradas != 0 else 0.0
)
for cat in ls_categories:
rows_summary.append(
{
"Ano": year,
"Categoria": cat,
"Total": totals[cat],
"Media": averages[cat],
"% Entradas": pct_entradas[cat],
}
)
df_summary = pd.DataFrame(rows_summary).round(2)
# ------------------------------------------------------------------
# Output
# ------------------------------------------------------------------
return {
"Pannel": df_cfa,
"Summary": df_summary,
}
[docs]
class CashFlowBBCC(CashFlow):
"""
A class for handling CSV data from Banco do Brasil Conta Corrente.
.. dropdown:: Script example
:icon: code-square
:open:
.. code-block:: python
from babilonia.accounting import CashFlowBBCC
# create an empty class
cf = CashFlowBBCC()
# set the file for CSV
file_csv = "path/to/file.csv" # [change this]
# load data
cf.load_data(file_csv)
# standardize data
cf.standardize()
# print data
print(cf.data.head(10))
# save data
file_out = "path/to/output.csv" # [change this]
cf.data.to_csv(file_out, sep=";", index=False)
"""
def __init__(self, name="CashFlowBBCC", alias="CFBBCC"):
super().__init__(name=name, alias=alias)
# include the stages of data
self.data_raw = None
self.data_parsed = None
[docs]
def load_data(self, file_data):
"""
Load raw data from bank CSV statement
:param file_data: Bank statement CSV file path
:type file_data: str or Path
:return: None
:rtype: None
"""
# overwrite relative path inputs
# ----------------------------------------------
self.file_data = os.path.abspath(file_data)
# implement loading logic
# ----------------------------------------------
try:
df = pd.read_csv(
self.file_data,
sep=",",
quotechar='"',
encoding="cp1252", # Banco do Brasil standard
dtype=str,
keep_default_na=False,
)
except UnicodeDecodeError:
# Fallback for alternative exports
df = pd.read_csv(
self.file_data,
sep=",",
quotechar='"',
encoding="latin1",
dtype=str,
keep_default_na=False,
)
# post-loading logic
# ----------------------------------------------
df.dropna(inplace=True)
self.data_raw = df.copy()
self.data_parsed = None
self.data = None
# update other mutables
# ----------------------------------------------
self.update()
# ... continues in downstream objects ... #
return None
[docs]
def standardize(self, force=False):
"""
Standardize data into canonical format.
:param force: Rebuild parsed data even if it exists
"""
if self.data_raw is None:
raise RuntimeError("No data loaded")
if self.data_parsed is None or force:
self.data_parsed = self.parse_data(self.data_raw)
self.data = self.data_parsed.copy()
return None
[docs]
def parse_data(self, df=None):
"""
Parse data to canonical format
:param df: Optional input data
:type df: ``pandas.DataFrame``
:return: Formated data
:rtype: ``pandas.DataFrame``
"""
if df is None:
df = self.data_raw
df = df.copy()
# --- normalize legacy column names ---
df = self.normalize_columns(df)
df = self.apply_drops(df)
df["Data"] = self.parse_date(df["Data"])
df["Valor"] = self.parse_valor(df["Valor"])
df["Categoria"] = ""
df["Descricao"] = ""
if "Detalhes" not in df.columns:
df["Detalhes"] = ""
df = df[
[
"Data",
"Valor",
"Categoria",
"Descricao",
"Lançamento",
"Detalhes",
"N° documento",
]
]
df = df.rename(
columns={"Lançamento": "Lancamento", "N° documento": "Documento"}
)
return df
[docs]
def parse_date(self, series):
"""
Parse BB date field from ``DD/MM/YYYY`` to datetime.
:param series: String series
:type series: ``pandas.Series``
:return: Datetime series
:rtype: ``pandas.Series``
"""
dates = pd.to_datetime(
series,
format="%d/%m/%Y",
errors="raise",
)
return dates
[docs]
def parse_valor(self, series):
"""
Convert ``Valor`` field to float.
.. dropdown:: Examples
:open:
.. list-table::
:widths: auto
:header-rows: 1
* - Input
- Output
* - ``5.000,00``
- ``5000.00``
* - ``-403,00``
- ``-403.00``
:param series: String series
:type series: ``pandas.Series``
:return: Value series
:rtype: ``pandas.Series``
"""
s = series.astype(str).str.strip()
# Detect Brazilian format (comma as decimal separator)
is_br_format = s.str.contains(",")
# Normalize only Brazilian-formatted values
s.loc[is_br_format] = (
s.loc[is_br_format]
.str.replace(".", "", regex=False) # thousands separator
.str.replace(",", ".", regex=False) # decimal separator
)
# Convert to float
values = s.astype(float)
return values
[docs]
def apply_drops(self, df):
"""
Filter dataframe for parsing
:param df: Input data
:type df: ``pandas.DataFrame``
:return: Output data
:rtype: ``pandas.DataFrame``
"""
df = df.query("Lançamento != 'Saldo do dia'")
df = df.query("Lançamento != 'Saldo Anterior'")
df = df.query("Lançamento != 'S A L D O'")
return df
[docs]
def normalize_columns(self, df):
"""
Normalize legacy / alternative column names to the current schema.
:param df: Input data
:type df: ``pandas.DataFrame``
:return: Output data
:rtype: ``pandas.DataFrame``
"""
column_aliases = {
"Histórico": "Lançamento",
"Número do documento": "N° documento",
}
for old, new in column_aliases.items():
if old in df.columns and new not in df.columns:
df = df.rename(columns={old: new})
if "Lançamento" not in df.columns:
raise KeyError(
"Expected column 'Lançamento' (or legacy 'Histórico') not found in CSV."
)
return df
[docs]
class CashFlowBBCCPJ(CashFlowBBCC):
"""
Class for handling BB-CC for PJ accoung CSV data.
"""
def __init__(self, name="CashFlowBBCCPJ", alias="CFBBCCPJ"):
super().__init__(name=name, alias=alias)
[docs]
def parse_valor(self, series):
"""
Convert ``Valor`` field to float.
.. dropdown:: Examples
:open:
.. list-table::
:widths: auto
:header-rows: 1
* - Input
- Output
* - ``5.000,00 C``
- ``5000.00``
* - ``-403,00 D``
- ``-403.00``
:param series: String series
:type series: ``pandas.Series``
:return: Value series
:rtype: ``pandas.Series``
"""
s = series.str.strip()
# Identify credit / debit
is_credit = s.str.endswith("C")
is_debit = s.str.endswith("D")
# Remove currency markers and spaces
s = s.str.replace(r"[CD]", "", regex=True).str.strip()
# Remove thousands separator and fix decimal separator
s = s.str.replace(".", "", regex=False)
s = s.str.replace(",", ".", regex=False)
# Convert to float (absolute value)
values = s.astype(float).abs()
# Apply sign
values[is_debit] *= -1
return values
[docs]
def apply_drops(self, df):
super().apply_drops(df=df)
df = df.query("Lançamento != 'BB Rende Fácil'")
df = df.query("Valor != '0,00 C'")
return df
[docs]
class CashFlowBBPP(CashFlowBBCC):
"""
Class for handling BB-PP CSV data.
"""
def __init__(self, name="CashFlowBBPP", alias="CFBBPP"):
super().__init__(name=name, alias=alias)
[docs]
def parse_data(self, df=None):
if df is None:
df = self.data_raw
df = df.copy()
# clear up rows and columns
df = self.apply_drops(df)
# Parse dates (DD/MM/YYYY -> datetime)
df["Data"] = self.parse_date(df["Data"])
# Parse Valor to float (keep column name)
df["Valor"] = self.parse_valor(df["Valor"])
df["Categoria"] = df["Histórico"]
df["Descricao"] = ""
df = df[
[
"Data",
"Valor",
"Categoria",
"Descricao",
]
]
return df
[docs]
def parse_valor(self, series: pd.Series) -> pd.Series:
"""
Convert ``Valor`` field to float.
.. dropdown:: Examples
:open:
.. list-table::
:widths: auto
:header-rows: 1
* - Input
- Output
* - ``5.000,00 C``
- ``5000.00``
* - ``-403,00 D``
- ``-403.00``
:param series: String series
:type series: ``pandas.Series``
:return: Value series
:rtype: ``pandas.Series``
"""
s = series.str.strip()
# Identify credit / debit
is_credit = s.str.endswith("C")
is_debit = s.str.endswith("D")
# Remove currency markers and spaces
s = s.str.replace(r"[CD]", "", regex=True).str.strip()
# Remove thousands separator and fix decimal separator
s = s.str.replace(".", "", regex=False)
s = s.str.replace(",", ".", regex=False)
# Convert to float (absolute value)
values = s.astype(float).abs()
# Apply sign
values[is_debit] *= -1
return values
[docs]
def apply_drops(self, df):
return df
[docs]
class BBCDB(DataSet):
def __init__(self, name="BBCDB", alias="BBCDB"):
super().__init__(name=name, alias=alias)
[docs]
def load_data(self, file_data):
# todo docstring
from io import StringIO
# ------------------------------------------------------------------
# Internal helpers (local on purpose: used only in this workflow)
# ------------------------------------------------------------------
def _to_float_br(series: pd.Series) -> pd.Series:
"""Convert Brazilian-formatted numeric strings to float."""
return (
series.str.replace(".", "", regex=False) # thousands separator
.str.replace(",", ".", regex=False) # decimal separator
.astype(float)
)
def _parse_date(series: pd.Series, fmt: str) -> pd.Series:
"""Parse date strings using a fixed datetime format."""
return pd.to_datetime(series, format=fmt)
def _parse_day_month_with_year(series: pd.Series, year: int) -> pd.Series:
"""Append year to DD/MM dates and parse."""
return pd.to_datetime(series + f"/{year}", format="%d/%m/%Y")
# ------------------------------------------------------------------
# 1. Raw text ingestion and cleaning
# ------------------------------------------------------------------
ls_data = BBCDB.read_txt(file_data)
year_data = BBCDB.get_year(ls_data)
# Drop structural noise and normalize line content
ls_data = BBCDB.drop_lines(ls_data, contains="--")
ls_data = BBCDB.drop_lines(ls_data, contains="==")
ls_data = BBCDB.drop_blank_lines(ls_data)
# Canonical line replacements
ls_data = BBCDB.replace_lines(ls_data)
ls_data = BBCDB.replace_lines(ls_data, "\n", "")
# ------------------------------------------------------------------
# 2. Structural splitting (accounts → sections)
# ------------------------------------------------------------------
dc_accounts = BBCDB.split_accounts(ls_data)
dc_sections = BBCDB.split_sections(dc_data=dc_accounts)
# ------------------------------------------------------------------
# 3. Section-specific parsing and normalization
# ------------------------------------------------------------------
dc_data = {}
for account, sections in dc_sections.items():
dc_account_data = {}
for section, lines in sections.items():
text = "\n".join(lines)
df = pd.read_csv(StringIO(text), sep=self.file_csv_sep, dtype=str)
# -----------------------------
# Section-specific normalization
# -----------------------------
if section == "EXTRATO":
df["Data"] = _parse_day_month_with_year(df["Data"], year_data)
df["Valor"] = _to_float_br(df["Valor"])
df = df.rename(columns={"Historico": "Categoria"})
df["Descricao"] = ""
df = df[["Data", "Valor", "Categoria", "Descricao"]]
elif section == "RENDIMENTOS":
df["Data"] = _parse_day_month_with_year(df["Data"], year_data)
df["Rendimento_Bruto"] = _to_float_br(df["Rendimento_Bruto"])
elif section == "SALDOS":
df["Data"] = _parse_date(df["Data"], "%d/%m/%Y")
for col in [
"Capital_Inicial",
"Juros",
"IR_Projetado",
"Capital_Projetado",
]:
df[col] = _to_float_br(df[col])
elif section == "DEPOSITOS":
df["Data_Aplicacao"] = _parse_date(df["Data_Aplicacao"], "%d/%m/%Y")
df["Data_Vencimento"] = _parse_date(
df["Data_Vencimento"], "%d/%m/%Y"
)
for col in ["Capital", "Saldo", "Taxa"]:
df[col] = _to_float_br(df[col])
dc_account_data[section] = df
dc_data[account] = dc_account_data
self.data = dc_data.copy()
return None
[docs]
@staticmethod
def read_txt(file_txt, encoding="cp1252"):
with open(file_txt, encoding=encoding) as f:
lines = f.readlines()
ls = []
for line in lines:
ls.append(line[:])
return ls
[docs]
@staticmethod
def drop_blank_lines(lines):
return [line for line in lines if line.strip()]
[docs]
@staticmethod
def drop_lines(lines, contains="-"):
return [line for line in lines if contains not in line]
[docs]
@staticmethod
def replace_lines(lines, contains="\xa0", relacer=" "):
return [line.replace(contains, relacer) for line in lines]
[docs]
@staticmethod
def split_accounts(lines):
# Mapping of account headers to their terminating marker.
# If the value is None, collection continues until end of file.
dc_accounts = {
"BB CDB DI": "BB CDB PROGRESSIVO",
"BB CDB PROGRESSIVO": None,
}
dc_accounts_names = {
"BB CDB DI": "CDBDI",
"BB CDB PROGRESSIVO": "CDBPG",
}
ls_accounts = list(dc_accounts.keys())
b_collect = False # Indicates whether lines are currently being collected
dc_data = {}
for account in ls_accounts:
ls_data = []
# Scan the full file line-by-line, collecting the block for this account
for line in lines[:]:
# Start collecting when the account header is found
if account in line:
b_collect = True
# Stop collecting when the next account header is found (if defined)
if dc_accounts[account] is not None and dc_accounts[account] in line:
b_collect = False
# Collect only lines within the active account block
if b_collect:
ls_data.append(line)
# Store a copy of the collected block for this account
dc_data[dc_accounts_names[account]] = ls_data[:]
return dc_data
[docs]
@staticmethod
def split_sections(dc_data):
sections = {}
section_titles = {
"EXTRATO": {"START": None, "END": "SALDO NOS ULTIMOS 6 MESES"},
"SALDOS": {
"START": "SALDO NOS ULTIMOS 6 MESES",
"END": "RESUMO DOS DEPOSITOS EM SER",
},
"DEPOSITOS": {
"START": "RESUMO DOS DEPOSITOS EM SER",
"END": "RENDIMENTO BRUTO NO PERIODO POR DEPOSITO",
},
"RENDIMENTOS": {
"START": "RENDIMENTO BRUTO NO PERIODO POR DEPOSITO",
"END": None,
},
}
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _collect_block(lines, start=None, end=None):
"""Collect lines between start and end markers (inclusive start)."""
collected = []
b_collect = start is None
for line in lines:
if start and start in line:
b_collect = True
if end and end in line:
break
if b_collect:
collected.append(line[:])
return collected
def _normalize_lines(lines):
"""Collapse whitespace and convert to semicolon-separated format."""
return [re.sub(r"\s+", ";", line.strip()) for line in lines]
def _rewrite_header_and_trim(lines, header):
"""Replace header row and drop section title line."""
lines[1] = header
return lines[1:]
# ------------------------------------------------------------------
# Main logic
# ------------------------------------------------------------------
dc_data_out = {}
for account, lines in dc_data.items():
dc_account_data = {}
for title, markers in section_titles.items():
ls_data = _collect_block(
lines,
start=markers["START"],
end=markers["END"],
)
# -----------------------------
# Section-specific reshaping
# -----------------------------
if title == "EXTRATO":
# Remove non-tabular summary lines
ls_data = [
line
for line in ls_data
if not any(
s in line
for s in ("Saldo anterior", "capital", "Saldo final")
)
]
ls_data[1] = "Data;Historico;Deposito;Valor"
# Merge paired rows into single logical records
if len(ls_data) > 3:
merged = ls_data[:2]
body = ls_data[2:]
for i in range(0, len(body), 2):
line = body[i] + body[i + 1]
line = (
line.replace("-", "")
.replace("valor juros", "")
.replace("Rendimento mensal", "Juros")
)
merged.append(line)
ls_data = merged[1:]
elif title == "SALDOS":
ls_data = _rewrite_header_and_trim(
ls_data,
"Data;Capital_Inicial;Juros;IR_Projetado;Capital_Projetado",
)
elif title == "DEPOSITOS":
ls_data = _rewrite_header_and_trim(
ls_data,
"Deposito;Data_Aplicacao;Capital;Saldo;Taxa;Data_Vencimento",
)
elif title == "RENDIMENTOS":
ls_data = _rewrite_header_and_trim(
ls_data,
"Data;Deposito;Rendimento_Bruto",
)
# Final canonical formatting
dc_account_data[title] = _normalize_lines(ls_data)
dc_data_out[account] = dc_account_data.copy()
return dc_data_out
[docs]
@staticmethod
def get_year(lines):
for line in lines:
if "Período: " in line:
ls1 = line.split(":")
ls2 = ls1[1].split("/")
return int(ls2[2][:4])
[docs]
class NFSe(DataSet):
"""
Class for handling NFSe XML data.
"""
def __init__(self, name="NFSeDataSet", alias="NFSe"):
"""
Initialize the NFSe object.
"""
super().__init__(name=name, alias=alias)
self.date = None
self.emitter = None
self.taker = None
self.service_value = None
self.service_value_trib = None
self.service_id = None
self.project_alias = None
def __str__(self):
"""
Nicely formatted string representation of the NFSe data.
"""
if self.data is None:
return "No data loaded."
# Format the main NFSe data
nfse_info = (
f"NFSe ID: {self.data.get('nfse_id', 'N/A')}\n"
f"Local de Emissão: {self.data.get('local_emissao', 'N/A')}\n"
f"Local de Prestação: {self.data.get('local_prestacao', 'N/A')}\n"
f"Número da NFSe: {self.data.get('numero_nfse', 'N/A')}\n"
f"Código de Local de Incidência: {self.data.get('codigo_local_incidencia', 'N/A')}\n"
f"Descrição do Serviço: {self.data.get('descricao_servico', 'N/A')}\n"
f"Valor Líquido: {self.data.get('valor_liquido', 'N/A')}\n"
f"Data do Processo: {self.data.get('data_processo', 'N/A')}\n"
f"Data Competência: {self.date}\n"
)
# Format the emitente (issuer) information
emitente = self.data.get(self.emitter_field, {})
emitente_info = (
f"Prestador:\n"
f" CNPJ: {emitente.get('cnpj', 'N/A')}\n"
f" Nome: {emitente.get('nome', 'N/A')}\n"
f" Endereço:\n"
f" Logradouro: {emitente.get('endereco', {}).get('logradouro', 'N/A')}\n"
f" Número: {emitente.get('endereco', {}).get('numero', 'N/A')}\n"
f" Bairro: {emitente.get('endereco', {}).get('bairro', 'N/A')}\n"
f" Cidade: {emitente.get('endereco', {}).get('cidade', 'N/A')}\n"
f" UF: {emitente.get('endereco', {}).get('uf', 'N/A')}\n"
f" CEP: {emitente.get('endereco', {}).get('cep', 'N/A')}\n"
f" Telefone: {emitente.get('telefone', 'N/A')}\n"
f" Email: {emitente.get('email', 'N/A')}\n"
)
# Format the tomador (receiver) information
tomador = self.data.get(self.taker_field, {})
tomador_info = (
f"Tomador:\n"
f" CNPJ: {tomador.get('cnpj', 'N/A')}\n"
f" Nome: {tomador.get('nome', 'N/A')}\n"
f" Endereço:\n"
f" Logradouro: {tomador.get('endereco', {}).get('logradouro', 'N/A')}\n"
f" Número: {tomador.get('endereco', {}).get('numero', 'N/A')}\n"
f" Complemento: {tomador.get('endereco', {}).get('complemento', 'N/A')}\n"
f" Bairro: {tomador.get('endereco', {}).get('bairro', 'N/A')}\n"
f" Cidade: {tomador.get('endereco', {}).get('cidade', 'N/A')}\n"
f" CEP: {tomador.get('endereco', {}).get('cep', 'N/A')}\n"
)
# Format the service information
servico = self.data.get("servico", {})
servico_info = (
f"Serviço:\n"
f" Código do Serviço: {servico.get('codigo_servico', 'N/A')}\n"
f" Descrição: {servico.get('descricao_servico', 'N/A')}\n"
f" Valor do Serviço: {servico.get('valor_servico', 'N/A')}\n"
)
# Combine all sections into one string
return f"{nfse_info}\n{emitente_info}\n{tomador_info}\n{servico_info}"
def _set_fields(self):
# ------------ call super ----------- #
super()._set_fields()
# Attribute fields
self.date_field = "Date"
self.emitter_field = "Prestador"
self.taker_field = "Tomador"
self.service_value_field = "ValorServico"
self.service_value_trib_field = "PTributoSN"
self.service_id_field = "ServicoID"
self.project_alias_field = "Projeto"
# ... continues in downstream objects ... #
[docs]
def load_data(self, file_data):
"""
Load and parse XML data from the provided file.
:param file_data: file path to the NFSe XML data.
:type file_data: str
:return: None
"""
# Ensure the file path is absolute
file_data = os.path.abspath(file_data)
# print(file_data)
tree = ET.parse(file_data)
root = tree.getroot()
# Namespaces used in the XML
ns = {
"default": "http://www.sped.fazenda.gov.br/nfse",
"ds": "http://www.w3.org/2000/09/xmldsig#",
}
# Dictionary to hold extracted XML data
nfse_data = {}
# Extract main NFSe data
nfse_data["nfse_id"] = root.find(".//default:infNFSe", ns).attrib.get("Id")
nfse_data["local_emissao"] = root.find(".//default:xLocEmi", ns).text
nfse_data["local_prestacao"] = root.find(".//default:xLocPrestacao", ns).text
nfse_data["numero_nfse"] = root.find(".//default:nNFSe", ns).text
nfse_data["codigo_local_incidencia"] = root.find(
".//default:cLocIncid", ns
).text
nfse_data["descricao_servico"] = root.find(".//default:xTribNac", ns).text
nfse_data["valor_liquido"] = float(root.find(".//default:vLiq", ns).text)
nfse_data["data_processo"] = root.find(".//default:dhProc", ns).text
nfse_data[self.date_field] = root.find(".//default:dCompet", ns).text
# Extract emitente (issuer) data
emitente = root.find(".//default:emit", ns)
nfse_data[self.emitter_field] = {
"cnpj": emitente.find(".//default:CNPJ", ns).text,
"nome": emitente.find(".//default:xNome", ns).text,
"endereco": {
"logradouro": emitente.find(
".//default:enderNac/default:xLgr", ns
).text,
"numero": emitente.find(".//default:enderNac/default:nro", ns).text,
"bairro": emitente.find(".//default:enderNac/default:xBairro", ns).text,
"cidade": emitente.find(".//default:enderNac/default:cMun", ns).text,
"uf": emitente.find(".//default:enderNac/default:UF", ns).text,
"cep": emitente.find(".//default:enderNac/default:CEP", ns).text,
},
"telefone": emitente.find(".//default:fone", ns).text,
"email": emitente.find(".//default:email", ns).text,
}
# Extract tomador (receiver) data
tomador = root.find(".//default:toma", ns)
nfse_data[self.taker_field] = {
"cnpj": (
tomador.find(".//default:CNPJ", ns).text
if tomador.find(".//default:CNPJ", ns) is not None
else None
),
"nif": (
tomador.find(".//default:NIF", ns).text
if tomador.find(".//default:NIF", ns) is not None
else None
),
"nome": tomador.find(".//default:xNome", ns).text,
}
# print()
# print(nfse_data[self.taker_field]["nome"])
_address = {
"logradouro": (
tomador.find(".//default:end/default:xLgr", ns).text
if tomador.find(".//default:end/default:xLgr", ns) is not None
else None
),
"numero": (
tomador.find(".//default:end/default:nro", ns).text
if tomador.find(".//default:end/default:nro", ns) is not None
else None
),
"complemento": (
tomador.find(".//default:end/default:xCpl", ns).text
if tomador.find(".//default:end/default:xCpl", ns) is not None
else None
),
"bairro": (
tomador.find(".//default:end/default:xBairro", ns).text
if tomador.find(".//default:end/default:xBairro", ns) is not None
else None
),
"cidade": (
tomador.find(".//default:end/default:endNac/default:cMun", ns).text
if tomador.find(".//default:end/default:endNac/default:cMun", ns)
is not None
else None
),
"cep": (
tomador.find(".//default:end/default:endNac/default:CEP", ns).text
if tomador.find(".//default:end/default:endNac/default:CEP", ns)
is not None
else None
),
}
nfse_data[self.taker_field]["endereco"] = _address.copy()
# Extract service data
servico = root.find(".//default:serv", ns)
nfse_data["servico"] = {
"codigo_servico": servico.find(
".//default:cServ/default:cTribNac", ns
).text,
"descricao_servico": servico.find(
".//default:cServ/default:xDescServ", ns
).text,
}
valor_servico_element = root.find(
".//default:valores/default:vServPrest/default:vServ", ns
)
nfse_data[self.service_value_field] = float(valor_servico_element.text)
nfse_data["servico"]["valor_servico"] = nfse_data[self.service_value_field]
tribut_element = root.find(
".//default:valores/default:trib/default:totTrib/default:pTotTribSN", ns
)
if tribut_element is None:
# Handle
v_tb = 6.0
else:
v_tb = float(str(tribut_element.text))
nfse_data["servico"]["p_tributo_SN"] = v_tb
# Set parsed data to the class attribute
self.data = nfse_data
self.date = nfse_data[self.date_field]
self.file_data = file_data
self.emitter = (
self.data[self.emitter_field]["cnpj"]
+ " -- "
+ self.data[self.emitter_field]["nome"]
)
# hande NIF or CNPJ
if self.data[self.taker_field]["cnpj"] is not None:
self.taker = (
self.data[self.taker_field]["cnpj"]
+ " (CNPJ) -- "
+ self.data[self.taker_field]["nome"]
)
elif self.data[self.taker_field]["nif"] is not None:
self.taker = (
self.data[self.taker_field]["nif"]
+ " (NIF) -- "
+ self.data[self.taker_field]["nome"]
)
else:
self.taker = self.data[self.taker_field]["nome"]
self.service_value = self.data[self.service_value_field]
self.service_value_trib = nfse_data["servico"]["p_tributo_SN"]
self.service_id = self.data["servico"]["codigo_servico"]
[docs]
class NFSeColl(Collection):
def __init__(self, base_object=NFSe, name="MyNFeCollection", alias="NFeCol0"):
"""
Initialize the ``NFSeColl`` object.
:param base_object: ``MbaE``-based object for collection
:type base_object: :class:`MbaE`
:param name: unique object name
:type name: str
:param alias: unique object alias. If None, it takes the first and last characters from name
:type alias: str
"""
# ------------ set pseudo-static ----------- #
self.object_alias = "NFE_COL"
# Set the name and baseobject attributes
self.baseobject = base_object
self.baseobject_name = base_object.__name__
# Initialize the catalog with an empty DataFrame
dict_metadata = self.baseobject().get_metadata()
self.catalog = pd.DataFrame(columns=dict_metadata.keys())
# Initialize the ``Collection`` as an empty dictionary
self.collection = dict()
# ------------ set mutables ----------- #
self.size = 0
self._set_fields()
# ... continues in downstream objects ... #
[docs]
def load_folder(self, folder):
"""
Load NFSe files from a folder
:param folder: path to folder
:type folder: str
:return: None
:rtype: None
"""
from glob import glob
lst_files = glob("{}/*.xml".format(folder))
self.load_files(lst_files=lst_files)
[docs]
def load_files(self, lst_files):
"""
Load NFSe files from a list of files
:param lst_files: list of paths to files
:type lst_files: list
:return: None
:rtype: None
"""
for f in lst_files:
nfe_id = "NFSe_" + os.path.basename(f).split(".")[0]
nfe = NFSe(name=nfe_id, alias=nfe_id)
nfe.load_data(file_data=f)
self.append(new_object=nfe)
# CLASSES -- Module-level
# =======================================================================
# ... {develop}
# SCRIPT
# ***********************************************************************
# standalone behaviour as a script
if __name__ == "__main__":
# Test doctests
# ===================================================================
import doctest
doctest.testmod()
# Script section
# ===================================================================
print("Hello world!")
# ... {develop}
# Script subsection
# -------------------------------------------------------------------
# ... {develop}