## Introduction

The data algebra is a Python system for designing data transformations that can be used in Pandas or SQL. The new 1.3.0 version introduces a lot of early checking and warnings to make designing data transforms more convenient and safer.

## An Example

I’d like to demonstrate some of these features with an example.

Let’s import our packages and build some simple example data.

```
from typing import Dict, Iterable
import numpy as np
import pandas as pd
from data_algebra.data_ops import descr, ViewRepresentation
import data_algebra.test_util
import data_algebra.BigQuery
```

```
= pd.DataFrame({
d 'id': [0, 1, 2, 3, 4],
'x': [4, 50, 1, 3, 2.2],
'g': ['a', 'b', 'a', 'a', 'b'],
})
d
```

id | x | g | |
---|---|---|---|

0 | 0 | 4.0 | a |

1 | 1 | 50.0 | b |

2 | 2 | 1.0 | a |

3 | 3 | 3.0 | a |

4 | 4 | 2.2 | b |

## An Example Problem

Our problem is to compute the median of values in column `x`

for each group of rows identified by the column `g`

.

To do this we will arrange methods that create new columns in a step in a transformation pipeline. In this case a “`descr()`

” step describes the incoming data structure, an “`extend()`

” node adds the new column, and we add a sorting (“`order_rows()`

“) node for presentation. Node documentation can be found here. Methods, such as “`.median()`

“, operate on column names, values, and even on intermediate results.

### The Solution

With some experience, one can write the data algebra solution as follows.

```
= (
ops =d)
descr(d
.extend('xm': 'x.median()'},
{=['g'])
partition_by'id'])
.order_rows([ )
```

`extend()`

” and “`order_rows()`

” steps are operators, which have an introduction here. What methods we can use in these nodes follows mostly Pandas and Numpy, and is in a table here.
### Applying the Solution

Now let’s apply our specified transform to our example data. The new column “`xm`

” has the correct group medians assigned to each original row.

```
= ops.transform(d)
pandas_res
pandas_res
```

id | x | g | xm | |
---|---|---|---|---|

0 | 0 | 4.0 | a | 3.0 |

1 | 1 | 50.0 | b | 26.1 |

2 | 2 | 1.0 | a | 3.0 |

3 | 3 | 3.0 | a | 3.0 |

4 | 4 | 2.2 | b | 26.1 |

```
for group in set(d['g']):
assert np.all(
'g'] == group, 'xm']
pandas_res.loc[pandas_res[== np.median(d.loc[d['g'] == group, 'x']))
```

## In Database

Part of the power of the data algebra is that the transform can be translated into SQL for execution on different databases. For example, we could try to execute this query on Google BigQuery as follows.

We build a database connection and insert our example data. In real applications the data would likely be large, and already in the database.

```
# connect to remote Google BigQuery database
= data_algebra.BigQuery.example_handle()
bigquery_handle # insert example table into database
# in actual application data is usually already there
= bigquery_handle.insert_table(
_
d,='d',
table_name=True) allow_overwrite
```

`CREATE TABLE`

statement.
```
= bigquery_handle.to_sql(ops)
bigquery_sql
print(bigquery_sql)
```

```
-- data_algebra SQL https://github.com/WinVector/data_algebra
-- dialect: BigQueryModel
-- string quote: "
-- identifier quote: `
WITH
`extend_0` AS (
SELECT -- .extend({ 'xm': 'x.median()'}, partition_by=['g'])
`id` ,
`x` ,
`g` ,
PERCENTILE_CONT(`x`, 0.5) OVER ( PARTITION BY `g` ) AS `xm`
FROM
`data-algebra-test.test_1.d`
)
SELECT -- .order_rows(['id'])
*
FROM
`extend_0`
ORDER BY
`id`
```

```
# works
= bigquery_handle.read_query(bigquery_sql)
db_res
db_res
```

id | x | g | xm | |
---|---|---|---|---|

0 | 0 | 4.0 | a | 3.0 |

1 | 1 | 50.0 | b | 26.1 |

2 | 2 | 1.0 | a | 3.0 |

3 | 3 | 3.0 | a | 3.0 |

4 | 4 | 2.2 | b | 26.1 |

```
assert data_algebra.test_util.equivalent_frames(
pandas_res, db_res)
```

## A Variation

If we wanted only one result row per group during our median calculation we would use the following pipeline, replacing the “extend” with a “project” (trying to stay close to Codd’s relational terminology).

```
= (
ops_p =d)
descr(d
.project('xm': 'x.median()'},
{=['g'],
group_by
) )
```

```
= ops_p.transform(d)
pandas_res_p
pandas_res_p
```

g | xm | |
---|---|---|

0 | a | 3.0 |

1 | b | 26.1 |

```
for group in set(d['g']):
assert (
'g'] == group, 'xm'].values[0]
pandas_res_p.loc[pandas_res_p[== np.median(d.loc[d['g'] == group, 'x']))
```

```
# warns!
= bigquery_handle.to_sql(ops_p) sql_p
```

```
/Users/johnmount/Documents/work/data_algebra/data_algebra/db_model.py:1694: UserWarning: BigQueryModel translation doesn't fully support method context: [MethodUse(op_name='median', is_project=True, is_windowed=False, is_ordered=False)]
warnings.warn(f"{self} translation doesn't fully support method context: {non_rec}", UserWarning)
```

```
# indeed, fails
# Notes: https://stackoverflow.com/a/57718190/6901725
try:
bigquery_handle.read_query(sql_p)except Exception as ex:
print(f'caught: {ex}')
```

```
caught: 400 percentile_cont aggregate function is not supported.
(job ID: dc82bb2d-34b7-409c-bb17-9fd7112d8476)
-----Query Job SQL Follows-----
| . | . | . | . | . | . |
1:-- data_algebra SQL https://github.com/WinVector/data_algebra
2:-- dialect: BigQueryModel
3:-- string quote: "
4:-- identifier quote: `
5:WITH
6: `table_reference_0` AS (
7: SELECT
8: `g` ,
9: `x`
10: FROM
11: `data-algebra-test.test_1.d`
12: )
13:SELECT -- .project({ 'xm': 'x.median()'}, group_by=['g'])
14: PERCENTILE_CONT(`x`, 0.5) AS `xm` ,
15: `g`
16:FROM
17: `table_reference_0`
18:GROUP BY
19: `g`
| . | . | . | . | . | . |
```

`PERCENTILE_CONT()`

” function is only available in windowed contexts (the number of rows being returned being the same as the number in the input), and not in project/grouping contexts (one row returned per group) contexts.
Given how similar the two SQL queries are, the above failure can come as a surprise. But a new feature of the data algebra is: the “translate to SQL” step warns we have a potential problem. This doesn’t even require a full database handle, it is data incorporated into the database model during package assembly.

## Patching The Solution

We can work around the BigQuery limitation by simulating the project-median by the execute-median, followed by a project-mean step. However, we feel automating such a conversion would hide too many details from the user.

Let’s try that solution by hand.

```
= (
ops_p_2 # start with our extend median solution
ops
.project('xm': 'xm.mean()'}, # pseudo-aggregation, as xm is constant per group
{=['g'],
group_by
) )
```

```
= bigquery_handle.read_query(ops_p_2)
db_res_p
db_res_p
```

xm | g | |
---|---|---|

0 | 3.0 | a |

1 | 26.1 | b |

```
assert data_algebra.test_util.equivalent_frames(
pandas_res_p, db_res_p)
```

## Conclusion

And that is the newest feature of the 1.3.0 data algebra: per-database SQL translation warnings. I feel the data algebra has a good breadth or footprint of correct translations. It now also good at saying if your pipeline is in fact in that correct region. The checking system helps in building and vetting complex statistical queries (such as our t-test example or our xicor example).

## Appendix

We built up ops_p_2 by adding a step to ops. The data algebra has minor optimizers both in the pipeline and SQL steps. For example, we can see in the combined pipeline the intermediate `order_rows()`

node is eliminated.

` ops_p_2`

```
(
TableDescription(table_name="d", column_names=["id", "x", "g"])
.extend({"xm": "x.median()"}, partition_by=["g"])
.project({"xm": "xm.mean()"}, group_by=["g"])
)
```

### Automating the Adaption

We can automate the “extend / project” adaption for re-use by using the `use`

API as follows.

```
# define a new pipeline macro: project_by_extend()
# this is a function we would define in a project for re-use
def project_by_extend(
# incoming pipeline
pipeline: ViewRepresentation, str, str], # methods key produced by expression
methods: Dict[*,
str], # grouping columns
group_by: Iterable[-> ViewRepresentation:
) """project by first computing an extend, and then
using pseudo-aggregators in a project.
Uses max as the pseudo-aggregator so allow use with non-numeric types.
"""
= list(group_by)
group_by return (
pipeline
.extend(# do what we wanted to do in project
# instead in windowed extend
methods,=group_by)
partition_by
.project(# collect the results into project,
# pseudo aggregation as values constant on groups
f'{k}.max()' for k in methods.keys()},
{k: =group_by)
group_by
)
# use the project_by_extend() macro while building the pipeline
= (
ops_3 =d)
descr(d
.use(project_by_extend,'xm': 'x.median()'},
{=['g'])
group_by
)
# print the resulting pipeline
ops_3
```

```
(
TableDescription(table_name="d", column_names=["id", "x", "g"])
.extend({"xm": "x.median()"}, partition_by=["g"])
.project({"xm": "xm.max()"}, group_by=["g"])
)
```

```
# confirm the same as constructed pipeline
assert data_algebra.test_util.equivalent_frames(
pandas_res_p,
ops_3.transform(d))assert data_algebra.test_util.equivalent_frames(
pandas_res_p, bigquery_handle.read_query(ops_3))
```

### Clean Up

```
# clean up
'd')
bigquery_handle.drop_table( bigquery_handle.close()
```

Categories: Coding Pragmatic Data Science Tutorials