Archive for the ‘Python / SciPy / pandas’ Category.

dbt and the ECB f/x Rates – Part 3

This entry is Teil 3 von 3 in the series dbt - ECB FX Example

In this final article of this series I will add another data mart. In the models/marts subfolder we add fx_current_rates.sql with the following SQL code:

{{ config(materialized='view') }}
 
SELECT
    currency,
    valid_from,
    rate_vs_eur
FROM {{ REF('fx_rates_with_validity') }}
WHERE is_current = TRUE

This will be a new view on top of the „fx_rates_with_validity“ mart. As this view ‚materializes‘ in the Postgres DB, we can use HeidiSQL to retrieve data from it:

dbt and the ECB f/x Rates – Part 2

This entry is Teil 2 von 3 in the series dbt - ECB FX Example

In this article we will continue our dbt journey. With dbt-core installed we can ask dbt to set up a new project for us.

Simply go to the root folder where your project folder shall be created and run dbt init projectname. dbt will then ask you for the database adapter and the connection settings as well as for the schema dbt is to build its objects in.

After these questions and answers dbt tells you that you may run dbt debug now to check if everything is fine, BUT: you need to switch first to the subdirectory you created otherwise there will be a meaningless error message:


Encountered an error:
Internal Error
Profile should not be None if loading profile completed

If run in the correct subdirectory dbt debug should not produce any errors. If it does, run the error message through the search engine of your choice.

C:\Users\UWe\Desktop\fx_project>dbt debug
16:52:20  Running with dbt=1.10.13
16:52:20  dbt version: 1.10.13
16:52:20  python version: 3.12.6
16:52:20  python path: C:\Python312\python.exe
16:52:20  os info: Windows-11-10.0.26200-SP0
16:52:20  Using profiles dir at C:\Users\UWe\.dbt
16:52:20  Using profiles.yml file at C:\Users\UWe\.dbt\profiles.yml
16:52:20  Using dbt_project.yml file at C:\Users\UWe\Desktop\fx_project\dbt_project.yml
16:52:20  adapter type: postgres
16:52:20  adapter version: 1.9.1
16:52:20  Configuration:
16:52:20    profiles.yml file [OK found and valid]
16:52:20    dbt_project.yml file [OK found and valid]
16:52:20  Required dependencies:
16:52:20   - git [OK found]

16:52:20  Connection:
16:52:20    host: localhost
16:52:20    port: 5432
16:52:20    user: postgres
16:52:20    database: FX
16:52:20    schema: dbt
16:52:20    connect_timeout: 10
16:52:20    role: None
16:52:20    search_path: None
16:52:20    keepalives_idle: 0
16:52:20    sslmode: None
16:52:20    sslcert: None
16:52:20    sslkey: None
16:52:20    sslrootcert: None
16:52:20    application_name: dbt
16:52:20    retries: 1
16:52:20  Registered adapter: postgres=1.9.1
16:52:20    Connection test: [OK connection ok]

16:52:20  All checks passed!

In the project folder you find a set of subfolders, remove the models/example subfolder, as it may cause error messages.

Next we will create some basic models, using YAML files.

In models, create sources.yml with the following content:

version: 2
sources:
  - name: raw
    schema: raw
    tables:
      - name: ecb_fx_rates

In models, create a subfolder staging and add stg_fx_rates.sql to this staging subfolder with the following content:

{{ config(materialized='view') }}
SELECT
    fx_date,
    currency,
    rate,
    created_at
FROM {{ SOURCE('raw', 'ecb_fx_rates') }}

This will simply select all the data we have in our raw DB schema.

Next comes is the interesting part. Each row gets a valid_from and valid_to date using a window function.

In models, create a subfolder marts and add fx_rates_with_validity.sql to this marts subfolder with the following content:

{{ config(materialized='table') }}
 
WITH base AS (
    SELECT
        fx_date,
        currency,
        rate
    FROM {{ REF('stg_fx_rates') }}
),
 
with_validity AS (
    SELECT
        currency,
        fx_date                        AS valid_from,
        -- valid_to is the day before the next rate, or null if it's the latest
        lead(fx_date) OVER (
            partition BY currency
            ORDER BY fx_date
        ) - INTERVAL '1 day'             AS valid_to,
        rate,
        -- flag the currently active rate
        CASE
            WHEN lead(fx_date) OVER (
                partition BY currency ORDER BY fx_date
            ) IS NULL THEN TRUE
            ELSE FALSE
        END                              AS is_current
    FROM base
)
 
SELECT * FROM with_validity
ORDER BY currency, valid_from

This will give uzs data as the following

currency valid_from valid_to rate_vs_eur is_current
USD 2025-01-02 2025-01-02 1.0312 false
USD 2025-01-03 2025-01-05 1.0345 false
USD 2025-01-04 null 1.0821 true

It also makes sense now to add some data checks.

In models, create schema.yml with the following content:

version: 2

models:
  - name: stg_fx_rates
    columns:
      - name: rate_date
        tests: [not_null]
      - name: currency
        tests: [not_null]
      - name: rate_vs_eur
        tests: [not_null]

  - name: fx_rates_with_validity
    columns:
      - name: currency
        tests: [not_null]
      - name: valid_from
        tests: [not_null]
      - name: is_current
        tests: [not_null]

We can now run the whole setup:

dbt run
dbt test
dbt docs generate
dbt docs serve

Going forward

Since ECB publishes the latest rate every data at around 16:00 CET the Python job to fetch them should be run each day (apparently after 16:00 CET). Once the raw database is updated you can update the dbt models:

dbt run --select stg_fx_rates fx_rates_with_validity
dbt test --select stg_fx_rates fx_rates_with_validity

In the final part of the series we will add another data market for the latest rates.

Zahlungspläne mit Python basteln

Für ein kleines Projekt brauchte ich die Möglichkeit, flexible Zahlungspläne zu erzeugen. Der folgende Python-Code tut genau das und erzeugt auch gleich passende INSERT Statements für Postgres.

Das CREATE TABLE ist wie folgt:

CREATE TABLE paymentplan (
    contract INTEGER NOT NULL,
    paymentdate INTEGER NOT NULL,
    paymenttype INTEGER NOT NULL,
    amount NUMERIC(12, 2) NOT NULL
);
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
 
def genPaymentPlan(contract, principal, paymentsperyear, interestrate, startdate, years):
    interest = interestrate/100*principal/paymentsperyear
    date = datetime.strptime(startdate, '%Y%m%d').date()
    output = pd.DataFrame(columns=['Contract', 'Paymentdate', 'Paymenttype', 'Amount'])
 
    # Add a row to the DataFrame
    start = {'Contract': contract, 'Paymentdate': startdate, 'Paymenttype': 1, 'Amount' : principal}
    output = pd.concat([output, pd.DataFrame([start])], ignore_index=True)  
 
    for payment in range(paymentsperyear*years):
        new_date = (date + relativedelta(months=12/paymentsperyear))
        date = new_date
        interest_line = {'Contract': contract, 'Paymentdate': new_date.strftime('%Y%m%d'),'Paymenttype': 2, 'Amount' : -1*interest}
        output = pd.concat([output, pd.DataFrame([interest_line])], ignore_index=True)  
 
    end = {'Contract': contract, 'Paymentdate': date.strftime('%Y%m%d'), 'Paymenttype': 3, 'Amount' : -principal}
    output = pd.concat([output, pd.DataFrame([end])], ignore_index=True)          
    return output
 
 
test = genPaymentPlan(123458, 2000, 2, 12, '20260101', 4)
#print(test)
 
 
 
# Generate INSERT statements
print("Generated SQL INSERT statements:\n")
 
for index, row in test.iterrows():
    contract = int(row['Contract'])
    date = row['Paymentdate']  # already string in format YYYY-MM-DD
    type =  row['Paymenttype'] 
    amount = float(row['Amount'])
 
 
    insert = f"INSERT INTO paymentplan (contract, paymentdate,paymenttype, amount) VALUES ({contract}, '{date}', {type} , {amount});"
    print(insert)

Mit Python pandas CSV nach Excel konvertieren

Hier ein einfacher Code-Schnipsel, um aus CSV-Dateien Excel-Dateien zu machen.

Es empfiehlt sich auch, die Engine explizit zu setzen, mit der die Excel-Datei geschrieben wird. Standardmäßig nutzt pandas openpyxl, xlsxwriter scheint nach meinen Tests aber um ca. 30% schneller zu sein.

import pandas as pd
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
 
 
# output control
files = ['']
 
wb = Workbook()
wb.remove(wb['Sheet'])
 
for index, file in enumerate(files,0):
    temp = pd.read_csv(file + '.csv',sep='\t')
    ws1 = wb.create_sheet()
    ws1.title = file
    rows = dataframe_to_rows(temp, index=False, header=True)
 
    for r_idx, row in enumerate(rows, 1):
        for c_idx, value in enumerate(row, 1):
            ws1.cell(row=r_idx, column=c_idx, value=value)
 
wb.save('files45646.xlsx')
wb.close()

Werte in pandas Dataframes ersetzen mit replace()

Aus aktuellem Anlass hier ein einfaches Beispiel, wie man Werte in pandas Dataframes ersetzen kann:

import pandas as pd
 
# initialize data of lists.
data = {'Hersteller': ['VW', 'BMW', 'VW', 'Porsche'],
        'Modell': ['Golf', '1er', 'Polo', '911']}
 
 
df = pd.DataFrame(data)
 
 
print(df,'\n')
 
 
df['Hersteller'] = df['Hersteller'].replace(
    {"VW": "Volkswagen", "Horch": "Audi"})
 
 
print(df)

DuckDB Beispiel-Code für Python

Hier ein Code-Beispiel für die wichtigsten Funktionen von DuckDB.

import duckdb as ddb
import pandas as pd
 
con = ddb.connect(':memory:')
 
con_p = ddb.connect('my_database.db')
 
con_p.execute('CREATE OR REPLACE TABLE telefonnummern(fullname VARCHAR,phone VARCHAR);')
 
 
con_p.execute("INSERT INTO telefonnummern VALUES ('Max Mustermann', '0123-4567890')")
 
print(con_p.sql('SHOW ALL TABLES'))
 
print(con_p.sql('SELECT * FROM telefonnummern;'))
 
 
ddb_object = con_p.sql('SELECT * FROM telefonnummern;')
 
df = ddb_object.to_df()
 
ddb_tuple = ddb_object.fetchall()
 
print(df)
 
print(ddb_tuple)

Python: print() durch ic() ersetzen

Hier ein paar Beispiele, wie man mit icecream print() Ausgaben ersetzen kann.

"""
icecream examples
"""
 
 
from icecream import ic
 
# define some function
def addiere(x, y):
    return x + y
 
# call ice
ic(addiere(1, 2))
 
# Output:
# ic| addiere(1, 2): 3
 
 
d = {'i': 2, 'j': 3, 'k': 4711}
ic(d['k'])
 
 
struct = {
    "hersteller": "VW",
    "modell": "Golf",
    "Farben": ["gelb", "rot"]
}
 
ic(struct)
 
ic.disable()
ic(struct) # no output
ic.enable()
 
 
def logstuff(text):
    # log to output file
    print(text)
 
 
ic.configureOutput(prefix="Hallo| ", outputFunction=logstuff)
 
ic(addiere(7, 7))
 
 
ic.configureOutput(prefix="Welt| ", outputFunction=logstuff)
 
ic(addiere(7, 7))

Mit Python camt.053 aus MT940 erzeugen

Für Dante e.V. bestand die Notwendigkeit, aus MT940 Dateien moderne CAMT.053 zu erzeugen, dank Python wurde das eine lösbare Aufgabe.

Schritt 1

Die MT940 Datei parsen und die Transaktionen in einen pandas DataFrame überführen.

import mt940
import pprint
import pandas as pd

df = pd.DataFrame()

transactions = mt940.parse('Umsaetze_2310007_22.07.2024.mta')

print('Transactions:')
pprint.pprint(transactions.data)

for transaction in transactions:
    print('Transaction: ', transaction)
    pprint.pprint(transaction.data)
    t = transaction.data
    tt = pd.DataFrame(t, index=[0])

    df = pd.concat([df,tt],ignore_index=True)
    
df.to_excel('AllBookings.xlsx',index=False)

Schritt 2

Aus dem DataFrame das XML befüllen, die für den Kopf der XML-Datei notwendigen Kontostandsinformationen holen ich dazu aus der MT940 Datei.

import pandas as pd # data wrangling
import jinja2 # template engine
import os # for file-related stuff
import mt940
from datetime import datetime


today = datetime.today()
now = today.strftime("%Y-%m-%d")



transactions = mt940.parse('Umsaetze_2310007_22.07.2024.mta')

opening = transactions.data['final_opening_balance']
openingamount = str(opening.amount)[:-4]
openingdate = opening.date

closing = transactions.data['final_closing_balance']
closingamount = str(closing.amount)[:-4]
closingdate = closing.date

 
# create jinja env that can load template from filesystem
jinja_env = jinja2.Environment(loader = jinja2.FileSystemLoader(os.path.abspath('.')))
 
df = pd.read_excel('AllBookings.xlsx', dtype={'date': str,'amount':str})

df['CreditDebit'] = ''
df['CreditDebit'] = df['CreditDebit'].where(df['amount'].str.get(0).isin(['-']), 'CRDT')
df['CreditDebit'] = df['CreditDebit'].where(~df['amount'].str.get(0).isin(['-']), 'DBIT')

df['date'] = df['date'].str[:-9]


# Währung weg
df['amount'] = df['amount'].str[:-4]
# Vorzeichen weg
df['amount'] = df['amount'].str.replace('-','')

#df['amount'].replace('-','',inplace=True)
#df["amount"] = df["amount"].apply(lambda x: x.str.replace("-", ""))

template = jinja_env.get_template('Ntry.xml')
 
with open('FertigesXML.xml','w') as output:
    output.write(template.render(data=df,
                                 openingamount=openingamount,
                                 openingdate=openingdate,
                                 closingamount=closingamount,
                                 closingdate=closingdate
                                 ))

Jinja2 XML-Template

Das XML-Template für Jinja2 findet ihr hier:

Ntry_blog

SFTP mit Python und der Paramiko-Bibliothek – Upload

Neben dem Download von Dateien klappt auch der Upload von Dateien problemlos.

import os
import paramiko
 
# Replace these variables with your specific values
host = '192.168.0.22'
port = 22
username = '<user>'
private_key_path = 'keyfile'
remote_directory_path = '/home/uwe/uploadtest'
local_directory_path = 'E:/uploadtest'
 
 
# Establish an SSH transport session
private_key = paramiko.RSAKey(filename=private_key_path)
transport = paramiko.Transport((host, port))
transport.connect(username=username, pkey=private_key)
 
# Create an SFTP client
sftp = paramiko.SFTPClient.from_transport(transport)
 
try:
    # Iterate through local files in the specified folder
    for local_file in os.listdir(local_directory_path):
        local_file_path = os.path.join(local_directory_path, local_file)
 
        # Check if the file is a CSV file
        if os.path.isfile(local_file_path): # and local_file.lower().endswith('.csv'):
            remote_file_path = os.path.join(remote_directory_path, local_file)
 
            # Upload the CSV file
            sftp.put(local_file_path, remote_file_path)
            print(f"Uploaded: {local_file} to {remote_file_path}")
 
finally:
    # Close the SFTP session and SSH transport
    sftp.close()
    transport.close()

SFTP mit Python und der Paramiko-Bibliothek – Download

Aktuell benötige ich Funktionen, um mit Python Dateien von SFTP Servern zu holen bzw. Dateien auf diese hochzuladen. Chat GPT hatte folgenden Code für mich, der sehr gut funktioniert.

import os
import paramiko
 
# Replace these variables with your specific values
host = '192.168.0.238'
port = 22
username = '<user>'
private_key_path = '<keyfile>'
remote_directory_path = '/home/uwe/downloadtest'
local_directory_path = 'E:/downloadtest'
 
# Establish SSH connection
try:
    # Create a new SSH client
    ssh_client = paramiko.SSHClient()
 
    # Automatically add the server's host key
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 
    # Load the private key for authentication
    private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
 
    # Connect to the server
    ssh_client.connect(hostname=host, port=port, username=username, pkey=private_key)
 
    # Open an SFTP session on the SSH connection
    sftp = ssh_client.open_sftp()
 
    # Change to the remote directory
    sftp.chdir(remote_directory_path)
 
    # List all files in the remote directory
    files = sftp.listdir()
 
    # Download each CSV file in the remote directory
 
    for file_name in files:
        # os path join uses system slashes, must make sure they are right
        remote_file_path = os.path.join(remote_directory_path, file_name).replace("\\","/")
        local_file_path = os.path.join(local_directory_path, file_name).replace("\\","/")
        print(remote_file_path, local_file_path)
 
        # Check if the file is a CSV file
        if file_name.lower().endswith('.txt'):
            sftp.get(remote_file_path, local_file_path)
            print(f"File '{file_name}' downloaded successfully to '{local_directory_path}'")
 
    # Close the SFTP session and SSH connection
    sftp.close()
    ssh_client.close()
 
except paramiko.AuthenticationException:
    print("Authentication failed. Please check your credentials or SSH key path.")
except paramiko.SSHException as e:
    print(f"SSH connection failed: {e}")
except FileNotFoundError:
    print("File not found. Please provide the correct file paths.")
except Exception as e:
    print(f"An error occurred: {e}")