[okfn-labs] Working with Bubbles for a data processing pipeline

Stefan Urbanek stefan.urbanek at gmail.com
Tue Dec 30 22:26:32 UTC 2014


I’m sad to say that the future of Bubbles is unclear. It is still a prototype and the proper branch named ‘generative’ is unfinished. The fate will be more clear after the first Data Brewery NYC meetup in second half of january. Meanwhile I would recommend to use either plain SQL Alchemy (generate statements) with combination of streamed ETL using mETL [1] where needed

By general advice to the problem described below would be:

1. validate CSV header → if bad STOP
2. load CSV into a table as soon as you can, STOP if CSV is invalid (can’t be loaded). if you expect to have large CSVs use COPY FROM [2][3] (get raw connection from alchemy)
3. perform validation in the table using generated SQL alchemy statements. STOP if critical errors are found. store results somewhere.

Have a system for automatically managing a staging schema (or one kind of staging schema). For our type of problem (OS) having a temporary table (DROP ON COMMIT) for the loaded CSV with VARCHAR columns would be a sufficient start. Alchemy does not support temporary tables so you either handcraft the SQL string or create a compiled alchemy object or go with non-temporary table with manual drop.

“generate reports for each component in the pipeline” – have the following system structures:

(a) job: (name, python package.module.function, some other metadata)
(b) job run: (batch, job, start time, end time, status, message, some other metadata)
(c) data errors: (batch, job, timestamp, dataset, row, column (if known), issue description, …)

(a) and (b) can be automated using a python wrapper that would store the information in those tables. The “master report” is then composed of all records from the system tables with common batch number.

As for the transformation, I would postpone that until the decision about the OS architecture is made (ongoing discussion on #openspending IRC channel).

To sum it up:

1. have a job management/registration/logging system in place
2. have every step of ETL as a python function registered as proper job (decorator?)
3. have a concept of a “data warehouse” instead of a “connection” which would provide more metadata, including the jobs, to the ETL function

example ETL function:

def some_transformation_or_validation(osdw):
   # do something. osdw.engine is for created, shared engine


composing alchemy statements is very easy, just requires you to think in sets not in records. For example to count empty fields (NULL) in a table:

import sqlalchemy.sql as sql
# assume we have list of fields = list of strings
# assume we have table to be examined

selection = [sql.functions.count(table.column[field]) for field in fields]
statement = sql.expression.select(selection, from_obj=table)
result = engine.execute(statement)

more fancy:

empty_fields = []
validation = []

for field in fields:
  label = “%s_empty_count” % field
  col = sql.functions.count(table.column[fiel])).label(label)

  … add other validation columns here with ther repsective labels

compose and execute as usual, check result columns with names in empty_fields for your validation results, check columns with other names (labels) for results of other validations (regexp?). Basically you try to express every validation as an aggregation formula on a column and issue ideally just one SQL SELECT statement (if you can). Neat feature ‘FILTER’ for aggregates comes in Postgres 9.4 that might be very helpful in this case [4], support in upcoming SQLAlchemy 1.0 [5]

Validation agains master data tables: just do outer join with the master data based on natural key/value. compose as usual.

Alchemy is Python 3.

Hope that helps. Accept my apologies for Bubbles.



[1] https://github.com/ceumicrodata/mETL <https://github.com/ceumicrodata/mETL>
[2] http://www.postgresql.org/docs/current/static/sql-copy.html <http://www.postgresql.org/docs/current/static/sql-copy.html>
[3] http://initd.org/psycopg/docs/cursor.html#cursor.copy_from <http://initd.org/psycopg/docs/cursor.html#cursor.copy_from> 
[4] http://www.postgresql.org/docs/current/static/sql-expressions.html#SYNTAX-AGGREGATES <http://www.postgresql.org/docs/current/static/sql-expressions.html#SYNTAX-AGGREGATES>
[5] http://docs.sqlalchemy.org/en/latest/core/functions.html#sqlalchemy.sql.functions.FunctionElement.filter <http://docs.sqlalchemy.org/en/latest/core/functions.html#sqlalchemy.sql.functions.FunctionElement.filter>

I brew data
Home: www.stiivi.com <http://www.stiivi.com/>
Twitter: @Stiivi

> On 28 Dec 2014, at 13:44, Paul Walsh <paulywalsh at gmail.com> wrote:
> This is mostly targeted at Stefan, creator of Bubbles (http://pythonhosted.org/bubbles/index.html <http://pythonhosted.org/bubbles/index.html>), but posting to list for other input/general knowledge.
> I’ve had a brief discussion with Rufus about Bubbles, and a look over the the documentation. I’m wondering how I could integrate it in my current work on CSV validation, but some areas of the docs are empty or light. At this stage, in theory, it looks like it could provide a lot of the backend for what we want to do, which in summary, is:
> Take a data source (CSV) and stream it through a validation pipeline:
>     - check for structural issues (missing/extra columns and rows, etc.)
>     - check validation against a json schema
>     - transform the stream in the pipeline (e.g.: remove empty rows before running the JTS validation)
>         - (save the original source and transformed source to file)
>     - generate reports for each “component” in the pipeline
>         - compile the reports from each component into a “master” report
> The API will be usable via Python for integration with existing codebases. Additionally, it will be wrapped as an HTTP service.
> The Bubbles docs cover several relevant areas we’d need (e.g.: http://pythonhosted.org/bubbles/operations.html#field-operations <http://pythonhosted.org/bubbles/operations.html#field-operations>), and there is a small section on custom operations, but it is not entirely clear (to me) how I might use Bubbles to construct our whole pipeline and generate reports in a format of our choosing.
> An important (deal-breaking) issue is Python 2.7 support: 
> I’m currently writing in Python 3, but aim to target 2/3 (we **need** to integrate with Python 2.7 codebases). It is pretty clear that Bubbles is Py3 only, but we thought we’d ask anyway about this, and what it might take to get Bubbles working on 2.7.x.
> _______________________________________________
> okfn-labs mailing list
> okfn-labs at lists.okfn.org
> https://lists.okfn.org/mailman/listinfo/okfn-labs
> Unsubscribe: https://lists.okfn.org/mailman/options/okfn-labs

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.okfn.org/pipermail/okfn-labs/attachments/20141230/2d2c52f5/attachment-0004.html>

More information about the okfn-labs mailing list