Posts tagged ‘FX rates’

dbt and the ECB f/x Rates – Part 3

This entry is Teil 3 von 3 in the series dbt - ECB FX Example

In this final article of this series I will add another data mart. In the models/marts subfolder we add fx_current_rates.sql with the following SQL code:

{{ config(materialized='view') }}
 
SELECT
    currency,
    valid_from,
    rate_vs_eur
FROM {{ REF('fx_rates_with_validity') }}
WHERE is_current = TRUE

This will be a new view on top of the „fx_rates_with_validity“ mart. As this view ‚materializes‘ in the Postgres DB, we can use HeidiSQL to retrieve data from it:

dbt and the ECB f/x Rates – Part 2

This entry is Teil 2 von 3 in the series dbt - ECB FX Example

In this article we will continue our dbt journey. With dbt-core installed we can ask dbt to set up a new project for us.

Simply go to the root folder where your project folder shall be created and run dbt init projectname. dbt will then ask you for the database adapter and the connection settings as well as for the schema dbt is to build its objects in.

After these questions and answers dbt tells you that you may run dbt debug now to check if everything is fine, BUT: you need to switch first to the subdirectory you created otherwise there will be a meaningless error message:


Encountered an error:
Internal Error
Profile should not be None if loading profile completed

If run in the correct subdirectory dbt debug should not produce any errors. If it does, run the error message through the search engine of your choice.

C:\Users\UWe\Desktop\fx_project>dbt debug
16:52:20  Running with dbt=1.10.13
16:52:20  dbt version: 1.10.13
16:52:20  python version: 3.12.6
16:52:20  python path: C:\Python312\python.exe
16:52:20  os info: Windows-11-10.0.26200-SP0
16:52:20  Using profiles dir at C:\Users\UWe\.dbt
16:52:20  Using profiles.yml file at C:\Users\UWe\.dbt\profiles.yml
16:52:20  Using dbt_project.yml file at C:\Users\UWe\Desktop\fx_project\dbt_project.yml
16:52:20  adapter type: postgres
16:52:20  adapter version: 1.9.1
16:52:20  Configuration:
16:52:20    profiles.yml file [OK found and valid]
16:52:20    dbt_project.yml file [OK found and valid]
16:52:20  Required dependencies:
16:52:20   - git [OK found]

16:52:20  Connection:
16:52:20    host: localhost
16:52:20    port: 5432
16:52:20    user: postgres
16:52:20    database: FX
16:52:20    schema: dbt
16:52:20    connect_timeout: 10
16:52:20    role: None
16:52:20    search_path: None
16:52:20    keepalives_idle: 0
16:52:20    sslmode: None
16:52:20    sslcert: None
16:52:20    sslkey: None
16:52:20    sslrootcert: None
16:52:20    application_name: dbt
16:52:20    retries: 1
16:52:20  Registered adapter: postgres=1.9.1
16:52:20    Connection test: [OK connection ok]

16:52:20  All checks passed!

In the project folder you find a set of subfolders, remove the models/example subfolder, as it may cause error messages.

Next we will create some basic models, using YAML files.

In models, create sources.yml with the following content:

version: 2
sources:
  - name: raw
    schema: raw
    tables:
      - name: ecb_fx_rates

In models, create a subfolder staging and add stg_fx_rates.sql to this staging subfolder with the following content:

{{ config(materialized='view') }}
SELECT
    fx_date,
    currency,
    rate,
    created_at
FROM {{ SOURCE('raw', 'ecb_fx_rates') }}

This will simply select all the data we have in our raw DB schema.

Next comes is the interesting part. Each row gets a valid_from and valid_to date using a window function.

In models, create a subfolder marts and add fx_rates_with_validity.sql to this marts subfolder with the following content:

{{ config(materialized='table') }}
 
WITH base AS (
    SELECT
        fx_date,
        currency,
        rate
    FROM {{ REF('stg_fx_rates') }}
),
 
with_validity AS (
    SELECT
        currency,
        fx_date                        AS valid_from,
        -- valid_to is the day before the next rate, or null if it's the latest
        lead(fx_date) OVER (
            partition BY currency
            ORDER BY fx_date
        ) - INTERVAL '1 day'             AS valid_to,
        rate,
        -- flag the currently active rate
        CASE
            WHEN lead(fx_date) OVER (
                partition BY currency ORDER BY fx_date
            ) IS NULL THEN TRUE
            ELSE FALSE
        END                              AS is_current
    FROM base
)
 
SELECT * FROM with_validity
ORDER BY currency, valid_from

This will give uzs data as the following

currency valid_from valid_to rate_vs_eur is_current
USD 2025-01-02 2025-01-02 1.0312 false
USD 2025-01-03 2025-01-05 1.0345 false
USD 2025-01-04 null 1.0821 true

It also makes sense now to add some data checks.

In models, create schema.yml with the following content:

version: 2

models:
  - name: stg_fx_rates
    columns:
      - name: rate_date
        tests: [not_null]
      - name: currency
        tests: [not_null]
      - name: rate_vs_eur
        tests: [not_null]

  - name: fx_rates_with_validity
    columns:
      - name: currency
        tests: [not_null]
      - name: valid_from
        tests: [not_null]
      - name: is_current
        tests: [not_null]

We can now run the whole setup:

dbt run
dbt test
dbt docs generate
dbt docs serve

Going forward

Since ECB publishes the latest rate every data at around 16:00 CET the Python job to fetch them should be run each day (apparently after 16:00 CET). Once the raw database is updated you can update the dbt models:

dbt run --select stg_fx_rates fx_rates_with_validity
dbt test --select stg_fx_rates fx_rates_with_validity

In the final part of the series we will add another data market for the latest rates.

dbt and the ECB f/x Rates – Part 1

This entry is Teil 1 von 3 in the series dbt - ECB FX Example

I have recently found the necessity to deal with dbt and Snowflake. As I learn best by practically doing something I decided to write a few practical use cases. Privately I will focus on Postgres SQL as I am not sure about Snowflake’s free tier. What is this dbt? dbt (data build tool) is a commandline tool based on Python that take care of the „T“ in „ETL“, the transformation of data in a data warehouse using SQL. dbt does not care about how you get the raw data into the (raw) DWH, this is not in its scope. dbt-core is available for free (Therefore I will focus on dbt-core, for production environments the paid tiers are most likely more suitable).

In this first example we will download the ECB fx rates from the European Central Bank and play a bit with them by transforming them and creating some data marts.

As mentioned I will use Postgres locally on my machine, so the installation of Postgres on my Windows machine was step 1. To deal with Postgres I can recommend HeidiSQL, does the trick.

Step 2 was the installation of dbt in Python. (Note that I had issues with Python 3.14, I downgraded to Python 3.12)

Besides dbt-core we need the postgres-Adapter dbt-postgres, so pip install dbt-core dbt-postgres helped. I usually do not need virtual environments, so I just installed it globally. We also need the Postgres library and lxml and requests for the ingestion of the XML file:

pip install requests psycopg2-binary lxml

As mentioned dbt does not care how you load the raw data, so with the help of some AI-tool I got the following Python Code to load the 90-days F/X rates file from the European Central Bank.

import requests
import xml.etree.ElementTree as ET
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
 
ECB_URL = "https://www.ecb.europa.eu/stats/eurofxref/eurofxref-hist-90d.xml"
 
DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "FX",
    "user": "postgres",
    "password": "<meingeheimespasswort>",
}
 
def fetch_ecb_fx():
    r = requests.get(ECB_URL, timeout=30)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    ns = {
        "gesmes": "http://www.gesmes.org/xml/2002-08-01",
        "def": "http://www.ecb.int/vocabulary/2002-08-01/eurofxref",
    }
 
    rows = []
 
    # Find Cube with time attribute
    for cube in root.findall(".//def:Cube[@time]", ns):
        fx_date = cube.attrib["time"]
 
        for rate in cube.findall("def:Cube", ns):
            rows.append(
                (
                    fx_date,
                    rate.attrib["currency"],
                    float(rate.attrib["rate"]),
                )
            )
 
    return rows
 
 
def init_db(conn):
    with conn.cursor() as cur:
        cur.execute(
            """
            CREATE SCHEMA IF NOT EXISTS raw;
 
            CREATE TABLE IF NOT EXISTS raw.ecb_fx_rates (
                fx_date date NOT NULL,
                currency varchar(3) NOT NULL,
                rate numeric(20,10) NOT NULL,
                created_at timestamp default now(),
                PRIMARY KEY (fx_date, currency)
            );
            """
        )
    conn.commit()
 
 
def upsert_rates(conn, rows):
    sql = """
        INSERT INTO raw.ecb_fx_rates (fx_date, currency, rate)
        VALUES %s
        ON CONFLICT (fx_date, currency)
        DO UPDATE SET rate = EXCLUDED.rate;
    """
 
    with conn.cursor() as cur:
        execute_values(cur, sql, rows)
 
    conn.commit()
 
 
def main():
    rows = fetch_ecb_fx()
    print(f"Fetched {len(rows)} FX rates")
 
    conn = psycopg2.connect(**DB_CONFIG)
 
    try:
        init_db(conn)
        upsert_rates(conn, rows)
        print("Load complete")
    finally:
        conn.close()
 
 
if __name__ == "__main__":
    main()

When this code is run it should provide some output close to the following.


Fetched 1798 FX rates
Load complete

In the second part of this series we will take care of the dbt work.