CodeTransformer
The CodeTransformer
is a component designed to execute custom Python code for transforming data stored in one or more SDIF files into various output file formats. It provides a flexible way to define and run data processing logic directly within your workflow.
1. Basic Usage
The simplest way to use CodeTransformer
is by providing a Python function that performs the transformation.
Define your transformation function:
This function takes a sqlite3.Connection
object, which provides access to the input SDIF data attached as schemas (e.g., db1
). It must return a dictionary where keys are relative output filenames and values are the data to be written (e.g., a pandas DataFrame).
# Example transformation function
import pandas as pd
import sqlite3
from typing import Dict, Any
from datetime import timedelta
def process_invoices(conn: sqlite3.Connection) -> Dict[str, Any]:
"""Reads invoices, calculates due dates, and returns a DataFrame."""
df = pd.read_sql_query("SELECT * FROM db1.factures WHERE type = 'Facture'", conn)
# Simple calculation example
df['IssueDateDT'] = pd.to_datetime(df['date_emission'])
df['DueDate'] = (df['IssueDateDT'] + timedelta(days=30)).dt.strftime('%Y-%m-%d')
# Select and rename columns
final_df = df[['id_facture', 'client', 'montant_ttc', 'DueDate']].rename(columns={
'id_facture': 'InvoiceID',
'client': 'Customer',
'montant_ttc': 'TotalAmount'
})
# Return dictionary: {output_filename: data_object}
return {"processed_invoices.csv": final_df}
Instantiate and run the transformer:
from satif.transformers.code import CodeTransformer
from pathlib import Path
# Assume 'input_invoices.sdif' exists
input_sdif_path = Path("input_invoices.sdif")
output_csv_path = Path("output/processed_invoices.csv")
# 1. Create the transformer instance with the function
transformer = CodeTransformer(function=process_invoices)
# 2. Execute the transformation and export the result
result_path = transformer.export(
sdif=input_sdif_path,
output_path=output_csv_path
)
print(f"Transformation successful. Output written to: {result_path}")
This will execute the process_invoices
function using the data from input_invoices.sdif
(accessed as schema db1
) and write the resulting DataFrame to output/processed_invoices.csv
.
2. Providing Transformation Logic
Besides passing a function object directly, you can provide the transformation logic in other ways:
a) Code String:
Pass the Python code as a string. You might need to specify the function name if it's not the default (transform
).
TRANSFORM_CODE = """
import pandas as pd
import sqlite3
# Default function name is 'transform'
def transform(conn: sqlite3.Connection):
df = pd.read_sql_query("SELECT client, SUM(montant_ht) as total_ht FROM db1.factures GROUP BY client", conn)
return {"client_summary.json": df}
"""
transformer = CodeTransformer(function=TRANSFORM_CODE)
# If function name was different, e.g., 'generate_summary':
# transformer = CodeTransformer(function=TRANSFORM_CODE, function_name="generate_summary")
b) File Path:
Provide a pathlib.Path
object pointing to a Python file containing the transformation function.
# Assume 'my_transforms/invoice_logic.py' contains the 'process_invoices' function
transform_script_path = Path("my_transforms/invoice_logic.py")
transformer = CodeTransformer(
function=transform_script_path,
function_name="process_invoices" # Specify the function to run from the file
)
# ... then call transformer.export(...)
c) Using the @transformation
Decorator:
Decorate your transformation functions with @transformation
. This registers them internally, allowing you to instantiate CodeTransformer
using the function object or its registered name (which defaults to the function name).
from satif.transformers.code import transformation
@transformation
def generate_report(conn):
# ... logic ...
return {"report.txt": "Report content"}
@transformation(name="custom_invoice_summary") # Register with a custom name
def create_invoice_summary(conn):
# ... logic ...
return {"summary.csv": df_summary}
# Instantiate using the function object
transformer1 = CodeTransformer(function=generate_report)
# Instantiate using the registered name (string)
transformer2 = CodeTransformer(function="custom_invoice_summary")
# ... then call transformer.export(...) for each
3. The Transformation Function
Your transformation code needs to adhere to specific requirements:
- Signature: Must accept at least one argument:
conn: sqlite3.Connection
. It can optionally accept a second argument:context: Dict[str, Any]
. - Input Data Access: Use the
conn
object to query data from the attached SDIF files. Standard SQL queries (e.g., viapandas.read_sql_query
) work directly. Input SDIF files are attached as schemas (see Section 4). - Return Value: MUST return a dictionary (
Dict[str, Any]
).- Keys: Relative paths for the output files (e.g.,
"data/summary.csv"
,"report.json"
). Subdirectories will be created automatically during export. Use POSIX-style separators (/
). - Values: The data to be written. Supported types include:
pandas.DataFrame
dict
orlist
(will be saved as JSON)str
(will be saved as UTF-8 text)bytes
(will be saved as a binary file)
- Keys: Relative paths for the output files (e.g.,
4. Handling Input SDIFs
CodeTransformer
can process data from one or multiple SDIF files.
Note: Throughout this documentation, SDIFPath
refers to either a string path or a pathlib.Path
object pointing to an SDIF file.
a) Single SDIF Path:
The default behavior shown previously. The data is accessible via the schema named db1
(or a custom prefix, see Advanced Configuration).
transformer.export(sdif="path/to/single.sdif", ...)
# SQL: SELECT * FROM db1.my_table
b) List of SDIF Paths:
Provide a list of paths. They will be attached sequentially with default schema names db1
, db2
, etc.
transformer.export(sdif=["invoices.sdif", "clients.sdif"], ...)
# SQL: SELECT * FROM db1.invoices_table
# SQL: SELECT * FROM db2.clients_table
c) Dictionary of Schema Names to Paths:
Provide a dictionary mapping your desired schema names to their corresponding SDIF file paths. This gives you explicit control over schema naming in your SQL queries.
transformer.export(
sdif={
"inv": "path/to/invoices.sdif",
"cust": "path/to/customers.sdif"
},
...
)
# SQL: SELECT * FROM inv.invoices_table
# SQL: SELECT i.*, c.name FROM inv.invoices_table i JOIN cust.info c ON i.cust_id = c.id
d) Using an SDIFDatabase
Instance:
If you already have an opened SDIFDatabase
object (from satif.sdif_database
), you can pass it directly. The transformer will use its existing connection and schema name. The connection will not be closed by the transformer in this case.
from satif.sdif_database import SDIFDatabase
# Assume db is an initialized SDIFDatabase instance with schema_name='input_data'
db = SDIFDatabase("my_data.sdif", read_only=True, schema_name="input_data")
try:
transformer.export(sdif=db, ...)
# SQL: SELECT * FROM input_data.some_table
finally:
db.close() # Remember to close the database connection yourself
5. Exporting Results (export
method)
The export
method takes the transformed data and writes it to the filesystem.
sdif
: The input SDIF source(s) (as described above).output_path
:- If the transformation returns a single output file and
output_path
does not point to an existing directory,output_path
is treated as the exact path for that single output file. - Otherwise,
output_path
is treated as the base directory where the output files (using the relative paths from the transformation result keys) will be written. If the directory doesn't exist, it will be created. - Defaults to the current directory (
.
).
- If the transformation returns a single output file and
zip_archive
(bool, defaultFalse
):- If
True
, all output files are written into a single ZIP archive named according tooutput_path
. In this case,output_path
must be a file path (e.g.,"output/archive.zip"
), not a directory. - If
False
(default), files are written directly to the filesystem based onoutput_path
.
- If
File Writing Behavior:
The format used for writing depends on the file extension in the keys of the dictionary returned by your transformation function and the type of the data object:
.csv
: Writespandas.DataFrame
usingto_csv(index=False)
..json
: Writespandas.DataFrame
usingto_json(orient="records", indent=2)
. Writesdict
orlist
usingjson.dump(indent=2)
..xlsx
/.xls
: Writespandas.DataFrame
usingto_excel(index=False)
. Requires optional dependencies (openpyxl
for.xlsx
).- Note: You must install
openpyxl
to write Excel files:pip install openpyxl
. If not installed, anExportError
will be raised when attempting to write Excel files.
- Note: You must install
.txt
(or other text-based): Writesstr
content.- Binary extensions (e.g.,
.bin
): Writesbytes
content. - Fallback: If a
DataFrame
is mapped to an unsupported extension, it defaults to writing as.csv
. Other unsupported data types will cause an error during export.
Security Note: Output filenames are sanitized to prevent writing outside the target output_path
directory (e.g., paths like ../other_dir/file.txt
or /absolute/path/file.txt
are disallowed).
6. In-Memory Transformation (transform
method)
If you only need the transformed data in memory (e.g., for further processing in Python) without writing files, use the transform
method. It accepts the same sdif
parameter types as the export
method.
# Get the transformed data as a dictionary {filename: data_object}
transformed_data = transformer.transform(sdif="input.sdif")
# Access the data
df_invoices = transformed_data.get("processed_invoices.csv")
if df_invoices is not None:
print(f"Processed {len(df_invoices)} invoices in memory.")
# If you need to export this data later, you can use the _export_data method
# transformer._export_data(data=transformed_data, output_path="output/dir")
7. Advanced Configuration
You can customize the CodeTransformer
during initialization:
function_name
(str, default"transform"
): The name of the function to call whenfunction
is provided as a code string or file path. Ignored iffunction
is a callable or uses the@transformation
decorator (which sets the name automatically).extra_context
(Dict[str, Any], default{}
): A dictionary of arbitrary Python objects that will be passed as thecontext
argument to your transformation function (if it accepts two arguments). Useful for passing parameters, configurations, or shared objects.db_schema_prefix
(str, default"db"
): The prefix used for default schema names (db1
,db2
, ...) when a list of SDIF paths is provided as input. Ignored if input is a single path, a dictionary, or anSDIFDatabase
instance.code_executor
(CodeExecutor, optional): In production, you can plug a different execution backend (e.g., a sandboxed environment with E2B). Defaults toLocalCodeExecutor
, which runs the code in the current Python process.
Example with Context and Custom Prefix:
# Transformation function accepting context
def process_with_context(conn, context):
discount = context.get('discount_rate', 0.0)
df = pd.read_sql_query(f"SELECT * FROM {context['schema']}.data", conn)
df['discounted_price'] = df['price'] * (1 - discount)
return {"discounted_data.csv": df}
# Instantiate with context and custom prefix for list input
transformer = CodeTransformer(
function=process_with_context,
extra_context={"discount_rate": 0.15, "schema": "src1"}, # Pass context
db_schema_prefix="src" # Input list schemas will be src1, src2...
)
# Run with a list of SDIFs
transformer.export(
sdif=["data1.sdif", "data2.sdif"],
output_path="output/context_example"
)
# SQL inside function will use 'src1.data' because of context['schema']
8. Error Handling
- Errors during the execution of the transformation code (e.g., SQL errors, Python exceptions within your function) are caught and re-raised as an
ExportError
. - Errors during file I/O (writing outputs) are also raised as
ExportError
. - Configuration errors (e.g., invalid input types, non-existent input files) typically raise standard Python exceptions like
TypeError
,ValueError
, orFileNotFoundError
. - Syntax errors in code strings/files might raise
ValueError
during initialization orExportError
during execution.
Always wrap calls to transform
or export
in a try...except
block to handle potential failures gracefully.