Splitting tasks in a APS / PDW
Working in the BI environment I faced over and over the task to copy and repartition huge (that is in structure and data content) tables while sorting out duplicate records in a PDW. In this case splitting tasks proved quite useful. Following I will describe this by means of a situation I faced lately: the copying, restructuring and cleansing (that is getting rid of all duplicate records) of a considerably large table.
Lately and as described in my previous blog article concerning parallel execution of SSIS packages I had to process records of several years and consolidate them together into one table. This meant an overall amount of several billion records with something like 60 columns. Furthermore – as mentioned at the beginning of this article – duplicate records had to be removed.
Working on a PDW environment – as it was the case in my example – with one or even multiple partitioned tables as source, under normal circumstances the tool of choice for this kind of task would be a CTAS (“create table as select”), which on the one hand side provides the structure of the target table and on the other hand side removes all duplicate records. However, trying to execute a complex CTAS like this for the mentioned amount of data will most certainly lead to an overflow of the temp DB after several hours of processing. Even split into year wise queries, the action wouldn’t succeed. Following I will describe the approach I found most suitable for the task at Hand.
First of all, judging by the fact that this kind of task is not of a unique nature but will have to be performed over and over again for various tables with various structures, I decided to implement a quite generic solution which consisted of a SSIS package on the one hand side and a DB structure which stored certain process specific settings on the other hand side.
The general idea this solution based on was the querying of the records from the source in a pre-partitioned way. Following a rough outline of the steps, which had to be performed, considering the fact that the data which had to be processed resided in numerous partitioned tables on a PDW and the destination should be another single (and as well partitioned) table on the same PDW:
- Create a destination table with the desired structure and the desired partitioning, covering the full partitioning range of the data which had to be processed (in my case: 36 month in three consecutive years plus additionally one partition which covered one month before and after the covered range).
- Define the source queries (only query, not CTAS!) under consideration of the source structure and the destination structure: the queries should consider the source partitioning in order to provide a good performance and the result should match the destination structure. Furthermore this query should already take care of the duplicate filtering, which can easily be achieved by a CTE featuring a ROW_NUMBER() OVER (PARTITION BY [key] ORDER BY [key]) followed by a SELECT … FROM CTE WHERE [row] = 1
- Create a configuration table, which stores (besides a unique configuration ID) the name of the destination table, the source query and if applicable connection strings to source and destination environment.
- Create a SSIS package which does the processing:
- Receive the configuration (dest. Table, connect string, query, etc.) as parameter.
- Connect to the destination environment and read the partitioning and hash columns as well as the range direction of the destination table from the corresponding system views into variables.
- Furthermore read the partitioning ranges and the partition numbers of the destination table into a list (object variable).
- Define a CTAS statement of a new table (in my case with the suffix _tmp) under consideration of evaluated column- and partitioning structure and with place holders for the range which has to be copied around the evaluated SELECT statement. This should look something like the following:
- CREATE TABLE MyTable_tmp WITH (DISTRIBUTION = HASH(MyDistCol), PARTITION (MyPartCol RANGE MyPartRangeDir FOR VALUES (<MyFirstRangeStart>, <MyFirstRangeEnd>))) AS SELECT * FROM (<MySourceQuery>)
- Loop through all partition ranges of the destination table and extend the CTAS statement by the current range values provided by the loop
- Execute the CTAS
- If applicable: Create another “dump” table with the same partitioning as the destination table and switch out the partition of the current range from the destination table to the dump table
- Switch the partition of the _tmp Table into the destination table
- Drop the _tmp table and proceed as described with the next Partition.
- Additionally – since I didn’t intend to start the package manually for all source tables / configuration IDs – I created a master package, which basically had only the task to read all not yet processed configurations, loop through them and pass the configuration details to the processing package.
The benefits of this approach are numerous. On the one hand side, it provides a quite performant way for the cleaning of duplicates. On the other hand side, it is re-runnable – if for one reason or the other the processing fails, you can start over where the failure occurred and do not have to start all over from the beginning. Finally, it provides the possibility to re-partition vast amounts of data, e.g. if the source is partitioned month wise and the destination has to be partitioned week wise.