Building data flows with Celery and SQLAlchemy

that I actually I’m here to talk about data processing today thank you all for braving data processing is the last talk of plaque on before the lightning talks I’m gonna try and be engaging enough to not put you all to sleep and if you do know a little bit about SQL alchemy your celery already I’ll try and shout that the big junctures to wake you up for the next section of things so I’m Roger bands you can find me on Twitter Email and I will put my slides on slideshow after today so I’m going to talk a bit about data warehousing and data integration and I realize looking at the program today in particular there’s a lot of people who do data a lot more than I do so in terms of big data and analytics I’m not going to try and cover a lot of that sort of stuff what I’m really looking at is a framework for doing data processing within a smaller environment it’s not to say you couldn’t scale this stuff up but this is really talking about an experience I had with taking sort of disparate systems and managing the data together using a pure Python sort of framework so to do that we’re gonna look at a bit a bit of SQL alchemy and also celery which is sort of an unlikely contender here and how I’ve used that so I’ve been doing a software development and other things for quite a while now and I spent 11 years at a business intelligence vendor while I wasn’t directly involved with the software they produced I certainly cut my teeth on a lot of this kind of these kind of concepts at the time I’m currently contracting and this particular talk is actually about a specific reporting system that I’ve built for one of those contracts so first of all for those who aren’t familiar a bit of an introduction to the idea of data warehousing or data integration because there are lots of different systems in an organization you might have different data quality levels different data life cycles different departments collecting and doing different things on all sorts of different different levels what that means is you end up with a very disparate set of non heterogeneous data and it’s also potentially used in different ways which is ambiguous and not actually consistent across an organization what we really want is a some sort of centralized reporting that is timely that is accurate unambiguous complete and ideally isn’t impacting production systems a lot of this is about making sure that you don’t have the Excel Empire up there where Bob in marketing and Jane the IT director aren’t getting their little bit of data from their little spot and they might not be they might be hitting that the systems too hard or they might be getting in completely wrong and actually reporting on data that they thought was accurate or complete when they’re not getting the right information at all so looking at a data warehouse I’m sort of at a top level it really is just a central repository of data that integrates all of these disparate systems together beneath all that there’s a huge amount of theory and really good practices in data warehousing that I’m not going to go into today but that concept of basically integrating everything together is what I’m going to focus on this particular slide isn’t designed to be readable so I’m not going to apologize for it yet this is a slide from the school of data it basically shows the different skill sets involved in what’s it called data processing pipeline and basically there’s a very large range of things from understanding data analyzing it even things like governance and compliance and so on what I’m actually going to focus on is really that middle bit of this diagram which is about the extraction and transformation of data or in other words this is a bit of a nicer diagram I think extracting from multiple systems transforming that through a staging layer and then putting it into a central database for consumption so we’re all here for Python Python can help do this it’s really good I think I’m probably preaching to the converted here really good for rapid prototyping lots of potential for code reuse there are a ton of libraries out there so whether you’re parsing XML or scraping webpages reading CSV files or reading out of databases there’s likely a library out there or somewhere to get to that data finally pythons quite good for decoupling so actually keeping your flow the data flow that your are you’re dealing with under management doesn’t have to be tied to the actual processing and the business logic doesn’t to be in the same place as well so good design allows you to separate those three concerns quite well in terms of existing solutions I sort of did a survey of this before I embarked on this project and there’s not a whole lot in the Python space around any business intelligence there are there are some things out there and I’m happy to hear about more of them if I’ve missed any people do tend to roll their own and quite often that’s because the business problem is often quite different and often the domain or the scale or the shape of the data really informs what decision you might make about a solution

there is recently released a framework called bubbles or brewery – data brewery org is where you can find that there is a link in a resource slide at the end it’s basically a framework that lets you build up operations and patterns that’s that you can string together in a pipeline so it’s not unlike what I’m going to talk about I haven’t dug very deeply into it though so I can’t really give you a comparison at this point I’m gonna call quickly about ways to move data around often the source data you have you have no choice about but once you have your data you could be looking at things like flat files being CSV XML or what-have-you there’s a whole lot of no SQL data stores around and depending on your data they could be quite useful and of course you’ve got your good old relational database systems and what most traditional data warehouses and this is certainly an area that’s changing tend to lean on these systems I’m going to talk about SQL alchemy today but really going back to that whole concept of decoupling you could use some of the concepts from this framework and build something based on a no SQL solution it’s really about sort of gluing together some concepts here today so very quickly I’m going to talk about what SQL alchemy is just quickly who is familiar with SQL alchemy at all who thinks they’re they’ve used it enough to be regularly familiar okay that’s good there’s a few people maybe I can convince you that it’s worth looking at basically SQL alchemy is a Python SQL toolkit in one layer and an object relational mapper top of that what that basically means is that you can generate SQL for various various databases in a declarative sense using Python we’re actually not going to look at the object relational mapper today because with this sort of low-level data-processing it’s it’s not something where objects and relational mapping really will help you out you can perhaps fall back to that and SQL alchemy does let you sort of jump back and forth between them if you want to so if you do need that for perhaps being strict about your your data definitions and so on then it’s something you can also consider so SQL alchemy is fairly well established I think everyone’s heard about it so I won’t go on too much about that and it’s also well used throughout the industry so you shouldn’t have any concerns about using it if you think that it’s sort of an under supported kind of system it has really good database support there’s probably obscure databases out there that people are using that are supported so it’s certainly worth checking but it’s got the basics pretty well covered and it’s also got good Python support so again depending on your environment there’s there’s lots of options there SQL alchemy is something like an onion that has layers basically at the bottom we have the python db-api and SQL can we just build on top of that it has dialects for dealing with different databases connection pooling transaction management and an SQL expression language and schemas and types which is really the where the heavy lifting happens in terms of translating your you’re talking about data in Python into SQL that actually pushes and pulls information out of databases which is what this slide says so like I said the ORM level is somewhat unnecessary when you’re doing data flow processing so we’re looking just at the core it does those bulk operations you can fine tune it ultimately this stuff is just generating SQL and so if you need to you can dive into lower layers of the the SQL alchemy infrastructure and and fiddle with it to make things work better a really quick example of how SQL alchemy looks this is creating a table basically you create an engine you have some metadata you then generate that table against that metadata using the engine that you’ve created so this is just an in-memory SQLite you can run that code and you would have a database table inserting data is simply a matter of having in this case a list of dictionaries key value pairs and putting them into the table using an insert operation what you can even do is say vehicle table dot insert down here and actually print that out and it will give you the string representation that is the SQL that it would run finally you can pull data back out of course and again it’s very much a declarative kind of thing where you say I want a select statement I have filtration you can order and group and all the things that SQL can do basically SQL can be tries to do that across a common set of backends for any imaginable set of functions and for those that it doesn’t cover you can always generate little bits of manual literal columns and SQL and functions that that might be specific to your database so now we’ve got SQL alchemy sort of figured out a little bit

thinking back to our data processing there’s this idea of going from that extraction stage and through a series of transformations and every one of those should be encapsulated into a unit of work so extracting from this CSV file into this table or extracting from this database to this table or transforming that will add a derive a new column in an existing table could all be considered single operations and so to that end we have this idea of a processor so here’s some example possibilities of generic processor types you might even have report types that are processors as well and so now we’re going to get a bit dirty and we’re gonna look at some code this is a base processor and it’s really just a shell at this point if you call dispatch on this processor class which is an abstract one it will ultimately call the run method which will need to implement in some sort of subclass the next logical extension of a processor is to have one that actually processes on a database so here we’re subclass in the base processor and where we now need to provide a DB class and now when the run method gets called it will set up a session for us that’s available for our run method to use going slightly sideways for a moment we have things like mix-ins where you can actually have say a CSV extract mixing which will encapsulate giving you a reader given an input file and so we can combine these these blocks at a class level into things that will extract from a file and put it into a database I should point out here that these code examples are functional but they are somewhat simplified from what I’ve actually ended up with in the project that I work on there are certainly some additional possibilities here so here’s a concrete example of an extract it is a CSV extract mixing and a database processor and basically what it does is it uses SQL alchemy to create a table instance it then creates columns based on the first row of the data from your CSV file and then for every row beneath that it inserts them into the table it’s a very naive but complete example of extracting from a CSV file into a table and so now what we have is a single processor that can do that one job repeatedly in this case it actually tries to create the table as well so you might need something that either resets it or is conscious of that possibility and needs to decide to do something from there thinking ahead a little bit further there’s the concept of transforming data so you already have a table and you might want to add an extra column that is computed based on other information already in that table so here we have the concept of an abstract I’ve transformed that given an input table which is also at the target table some sort of key column that lets you keep track which columns you want to select to do your computation and which target column you’re actually promising to provide we have then this abstract process flow where having selected certain rows out of your your table you will apply a process and they will get written back out to the database and I’m having to do slide driven development here where there’s a lot of additional code potentially involved in doing that if anyone does want more code snippets I’m happy to provide them it’s not something I’ve actually extracted into an open source form though at this point so I can’t point you straight at it at the moment so a concrete implementation of a transform might be the derived food transform so we have a sales table it’s keyed by transaction ID we know we need the location and the user name to derive foo and we’re going to write out a target column called foo and all we have to implement is the process row which calls upon some external business logic to derive foo based on the location and the user name provided again this is quite a simplified example and it might be that your your abstract processor will only update values that aren’t already derived so if you’ve previously already extracted some information and derived it this would let you excuse me just update those that need derivation that haven’t had it before there any questions at this point maybe we actually should save it to the end be easier so that’s kind of SQL alchemy and we’ve defined these processor units of work that will do jobs that will extract and transform and at this point you could run them in an order that you know and you can get some information out of the other end from a consistent and coherent set of tables so now let’s have a look at celery wake up we’re now looking at celery which traditionally is used in web applications and it’s used for back-end processing of things that you know a user logs in and you might need to go off and do something like download their

Facebook friends and you don’t need to interrupt the user with that so you do it in back-end so celery is a distributed task queue you can push tasks into the broker and a bunch of workers will pick them up and do that work that’s pretty much all celery is basically at its core celery has the concept of a task and so what we have here is a very simple wrapper that takes a celery task and extends it so that given a processor which we’ve just looked at it will actually run it so at the top we have the abstract version of the class the two lines near the bottom are saying the derive food task is simply a process a task that runs the derive food transform and then the way that you run a task with celery you say derive food task apply async and so that would get sent off to the celery broker in the backend to be run so now we’ve gotten from the point of a transform that can be run to a task that can be written run asynchronously how can this help us from here on top of celery there is a thing called a canvas and canvas basically gives you a way to combine these tasks together you can create groups so you can say I want this task and this task and this task to run and they don’t depend on each other they can all happen in parallel but then you might need to say tasks do depend on each other and so you have this concept of chains that run in series and so you can combine series of groups and chains and there’s also things called chords and maps and various things that will build kind of a graph kind of shaped series of dependencies and this isn’t necessarily about getting performance out through parallelization although that is something you can get it’s more about putting things in an order that you know that this task is going to depend on the output of this task so run it beforehand this example here is from the the celery docks and basically demonstrates that after creating a new user well the new user workflow will create a new user and the pipe here is actually going to create a chain and it pipes that into a group which is in parallel importing the contacts and sending a welcome email and then again we’re going to then run that workflow given a set of input so now we’re looking at a data processing flow we’ve talked a lot about the concept of how this all works now so this is just one fairly contrived example where you might have different data from different sources and we sort of grouped together the extract time concerns we then have a series of transforms that copy that to a transform layer so we leave our extract layer alone and we then start transforming in a separate layer this gives us some options where if we want to actually blow away that middle layer we don’t have to reiax tracked everything all over again sometimes having multiple copies through the the pipeline can give you that flexibility and we have just one simple transform here that will normalize the currency of your sales Trent sales data before joining them all together into some reporting structure and then actually running some aggregations and an exception report that will perhaps look at data quality so that’s the visual representation of that and these boxes and arrows are effectively groups and chains the way that looks in code I hope that’s readable is actually we defined free flows an extract flow which is one group a transform flow which is another group that also happens to contain a couple of chains and then a load flow which is the quality task and our aggregation tasks and then our all flow is the chaining of all three of those sub flows together the met the process of running that we then simply be a matter of all flow dot apply async so once you’ve done that you’ve sent off all these tasks and they’re they’re sitting in the broker and sort of ticking over and the workers are picking them up and things can go wrong and so on we need to think about ways that we can actually monitor this and see how it’s going so celery has within it a an events view which is a simple cursors based interface and it’s actually I’ve had to blank some things out in there so that’s what the gaps mean you can actually see how this flow is running and you can see where there are cords that are waiting to unlock because there are processes in action you can see that one of the transforms are started and some others have already finished so as this process is flowing as soon as you’ve kicked it off you can look at this view and see how things are ticking over you can also navigate in and the results of each of these I didn’t point out specifically before that each of your tasks can return a value in the example I just returned the row count but you can also return for example a dictionary containing multiple bits of information maybe some messages some kind of metadata about how things are going there’s another view into things

which is more modern but this is sort of processing I found was less real time because it doesn’t refresh as often called flower and this gives you a web-based view have much the same thing so as I said before this is really a simple example turning it up to 11 you can add all sorts of things to this infrastructure I’ve added a requires and a depend structure to this where each task says what it provides and says what it needs to do that and so there’s a unit test that once you’ve defined a flow asserts that every task in the pipeline actually gets provided what it needs upstream and provides to the things downstream that it’s supposed to doing things like incremental data loads lets you rather than doing on mass replacements through your data flow you could look for say just the last 24 hours of data and not reload everything over and over again this has a lot of potential to speed up processing time where you’re just loading the new data you’re just processing the new data you’re just reporting or perhaps you have to rebuild the reports at the end because of aggregations but it certainly can speed things up this particular reporting system took a process that for whatever reason it took four hours to run and now after an initial run of several minutes can potentially run in several seconds you can also potentially parameterize these flows and say well I have a nightly run that actually does everything but maybe every four hours I just want to refresh the extracts and rerun the data quality report which only needs the extracts to have run you can customize and choose which bits of this you want to do tracking flow history actually storing the results each time that you run your flow means that you can keep track of just as you would with any other batch system knowing historic how things have gone and of course with the power of Python you can hook into just about anything else if you want to do some fancy natural language processing or scientific computing in your transforms you certainly could really though this this particular system is about getting data in a well enough mangled form or unmingled form to do those those analytics in a separate environment so I’m running quite quickly through this to summarize basically we talked about an intro to the concept of data warehousing or data integration and I’ve shown a simple example of how one would process started with SQL alchemy and then string it all together using celery’s canvas i’ve got a list of resources here which will go up on SlideShare and that’s actually me done any questions hi thanks to the top girls great um I actually do very similar things to what you’re doing here at my work although my RM is kind of purchased use the Jango one because that’s what I what I knew and I’ve thought about whether or not I should go to seek Wow alchemy a good thing with the Jango one is I’ve got south for my migrations what’s the quality of seeker welcome means kind of migration handling for me does anything could exist so it was a question the quality of tools that can do schema migration for me yes yes there is there are two migration tools in SQL are coming ones call SQL alchemy migrate and I believe it’s it’s quite functional but is now sort of unofficially deprecated in favor of one called Alembic which will do migrations also it’s something I didn’t really touch on and it’s a very good question because often your data changes and then your your flows need to update accordingly some of the things I’ve done is actually put poor men’s migration into these flows where if a new column appears it will automatically put it through your your middle stage layers and you can expect it to show up on the report at the other end now you might not be comfortable with that and so then you could look at some some more specific migration tools at the moment it’s it seems to be working well that this thing is kind of a more first in it it adds columns when it knows it needs to and then by the time you get to the end there’s a certain amount of also putting testing in place and validation reports at the end to say this number should have come out or this column should have at least come out as well all right it’s nice to see someone talking about sick love me yeah it’s probably way more powerful than Django’s it’s not as expressive I think I’ve used both and yeah because pretty fantastic we use it in production my question is that you had a very structured way about it you know you had your classes and your subclasses and I was getting a bit of a Java feel it’s I’m not trying to insult you anything but um but is this because you you often do these kind of things you’ve got a very structured way of doing it or

you’re just a very structured thinker exactly it kind of evolved that way first of all you’ve uncovered my dirty little secret that the 11 years I was at the business intelligence vendor was bad Java but it actually I started out writing this quite functionally with functions to do a lot of things and as soon as I wanted to refactor the common code I found myself in classes so I have a mix in for this and there’s there’s mix-ins for updating blank columns or for selecting rows out of a database and in some ways it’s probably got a couple more layers and I’d like and it’s time to refactor back down and perhaps this example shows about us as you know it shouldn’t get any more sub class than that really once you’ve got those layers in the top you then just use them and then you have a flat structure so it’s it’s not necessarily strictly the case but I what happened in my cases that that worked out the best and the person who I was working with didn’t disagree and he knows some Python too so yeah it seemed to work I’d be I’d be open to hearing alternatives to that I remember saying it’s perfect by any means yep how do you pass data between each stage in the pipeline can you take this with the output of one and pipe it into the next stage how have you found the best way to do that very good question you can actually have the tasks receive input and provide output but for the purposes of these flows they actually don’t take any parameters or produce any result that’s consumed by other steps it’s certainly possible but what we decided in this was actually we only have one database that actually is the core of all of this this warehousing once everything’s extracted the rest of the process is the same database so at that point we know what tables are in there and we let the data speak for itself so in more complicated cases we actually put timestamps on things and we say the last modified record in or the last modified column even we actually have per column time stamps was this time let’s look back at the the columns that feed that and get any data that’s newer or refreshed and rerun those but it’s purely the data lives in the database and the transforms by virtue of the fact that they know that they have to come one after one another trust that the last step is put stuff in there but as I said it is possible for the maybe the upstream process to communicate something to another task to say how you need to know about this but it’s not so much of the streaming data level that’s really pick it up put it down pick it up put it down in terms of its structure are there any other questions for Roger I know that there are a fair few options with salary for how you actually pass messages around and I was wondering pubbing RabbitMQ or Redis or and I presume you can use the database itself did you actually need to care about that that side of things or was there any particular reason you chose one solution or another not particularly the because it’s really using salary in such a different way things like you know concurrency and how it handles messages and whether you can have different kinds of queues weren’t a factor I’ve used both rabbit and Redis and at one point I was using SQL alchemy even itself as as a back-end Broker and SQL alchemy wasn’t full-featured so I dropped that simply because I wasn’t getting events and sort of broker level information in the in the monitoring but reticent rabbit have both been fine a lot of the decision-making came down to whether some of these ran better on Windows or not I don’t use Windows but my colleague does and so sometimes that was a factor but both actually works just fine anybody else well Rajat here is a a packet of the Norwegian coast of the coffee that was being sent through the conference and here is the conference Python au conference mug and please join me in thanking Roger bands thank your time everyone