Google BigQuery Data Update Optimization
When a tech guy enters into Google BigQuery world, the very first impression is the fear of maximizing the cost. Because BigQuery gives your hand a great power which comes with great responsibility: pay-as-you-go method. From a business intelligence architect perspective, “cost” becomes a key parameter as important as “duration” when designing a data warehouse. Hence, we gotta keep optimizing!
Issue with Data Update:
Google Cloud Storage is our data gathering layer. Our Kafka connector listens to CDC or JDBC changes from source tables and feeds GCS with avro files in frequent periods. We can directly connect data from BigQuery by creating external tables.
The goal is simple: pull only new & updated data from GCS and merge into our actual BigQuery table. However, we face two problems when doing so: scanning data of both GCS avro files and also actual tables cause the cost to skyrocket.
Let’s say there are 500 GB of avro files in GCS which keeps all change logs of data and BigQuery table to be updated which has 100 GB size. Even if only 100 MB of new data is needed to be updated, whole GCS data is scanned when we run a select query to an external table. And even if we filter 100 MB data when updating, all BigQuery table data is also scanned. 500 + 100 = 600 GB cost in total is crazy for every run.
We solved the first problem, reading only necessary files from GCS, by hive partitioning. We also solved the second problem, scanning all table data when updating, which I am about to specify details.
Table partitioning is an important feature in BigQuery. When selecting data from a table, cost reduces drastically if partition is filtered. Because only the filtered partition is scanned, not more. It also occurs when merging data.
Let’s stick with the example above. We have 100 MB data to be updated. We also have the information on which partitions their records belong to. Thus, we detect the partitions about to be updated, arrange them into an array (prPartitionFilter) and use it into merge script (PartitionColumn in UNNEST(prPartitionFilter)).
DECLARE prPartitionFilter ARRAY<DATE>;
CREATE OR REPLACE TEMP TABLE TmpSourceData
PARTITION BY PartitionColumn
AS
-- contains only 100MB of new & updated data, pulled from GCS
SELECT *
FROM `ciceksepeti-dwh.ExtDataset.STProductView`;
-- List of partitions of new & updated data
SET prPartitionFilter = (SELECT ARRAY_AGG(distinct PartitionColumn) as PartitionFilter FROM TmpSourceData);
MERGE INTO ciceksepeti-dwh.DestinationDataset.DimProduct main
USING TmpSourceData tmp on main.ProductId = tmp.ProductId
AND main.PartitionColumn in UNNEST(prPartitionFilter)
-- only the realted partitions are scanned from Destination table
WHEN MATCHED THEN
UPDATE SET
...
Only one line of UNNEST(prPartitionFilter) script reduces merge cost from 100GB to hundreds of MBs. In summary, +99% of cost reduced.
Issue with Using Data:
When it comes to using data, again we need to avoid full scan of data. This time, flag tables help us to evaluate only new & updated data.
We log the datetime of every run of every procedure (Flag.DimNewDataFlag).
--==========================================
-- Flag update for next run
--==========================================
MERGE INTO Flag.DimNewDataFlag main
USING (SELECT 'DatasetName' as DWDataset
,'DestinationTableName' as DWTable
,prStartDate as ETLDate ) tmp
on main.DWDataset = tmp.DWDataset
and main.DWTable = tmp.DWTable
WHEN MATCHED THEN
UPDATE SET main.ETLDate = tmp.ETLDate
WHEN NOT MATCHED THEN
INSERT (DWDataset,DWTable,ETLDate)
VALUES (tmp.DWDataset,tmp.DWTable,tmp.ETLDate);
Besides, We also log the partitions which are affected on every run (Flag.TablePartitionsFlag).
--==========================================
-- Updated partitions log
--==========================================
insert into Flag.TablePartitionsFlag
SELECT * FROM (
SELECT
'DatasetName' as DWDataset
,'DestinationTableName' as DWTable
,prStartDate as ETLDate
,prPartitionFilter AS PartitionList)
where PartitionList is not null;
These flag tables give advantage for every procedure that can run in parallel and calculate only necessary data.
Let’s say we have DatasetName.DestinationTable that has its own algorithm and needs to be calculated every hour. The base table is DatasetName.BaseTableName. When the procedure is triggered at 02:00 PM, script gets the last run date (01:00 PM) from Flag.DimNewDataFlag. The answer of “which partitions are updated since 01:00 PM in DatasetName.BaseTableName” is stored in Flag.TablePartitionsFlag.
--==========================================
-- The last run date of procedure
--==========================================
set prLastRunDatetime = (select ETLDate
from Flag.DimNewDataFlag
where 1=1
and DWDataset = 'DatasetName'
and DWTable = 'DestinationTableName');
--==========================================
-- The updated partitions of base table
-- since the last run date of procedure
--==========================================
set prPartitionListtoScan = (select ARRAY_AGG(distinct PL)
from Flag.TablePartitionsFlag
,unnest(PartitionList) PL
where 1=1
and DWDataset = 'DatasetName'
and DWTable = 'BaseTableName'
and ETLDate >= prLastRunDatetime);
With this information, we are ready to go! When we use prLastRunDatetime and prPartitionListtoScan parameters in the algorithm, only the assigned partitions will be scanned. If there is no data updated since 01:00 PM, then there also will not be any cost for their calculation processes.
--==========================================
-- Main algorithm, only lasted data
-- is taken into account
--==========================================
create temp table TemptoUpdate
partition by PartitionColumn as
select
... -- Calculation algorithm
,PartitionColumn
from DatasetName.BaseTableName
where 1=1
and UpdateDate >= prLastRunDatetime
-- UpdateDate is automatic filled date for every record update
and PartitionColumn in unnest(prPartitionListtoScan);
-- Only related partitions are scanned.
As a result, many percent (+80% for our cases) of our cost is optimized depending on the algorithm, table and trigger frequency.
Whole script combination:
CREATE OR REPLACE PROCEDURE ProjectName.DatasetName.spProcedureName()
begin
declare prgetdate int64;
declare prLastRunDatetime Timestamp;
declare prPartitionListtoScan ARRAY<DATE>;
declare prPartitionFilter ARRAY<DATE>;
declare prStartDate Timestamp;
--==========================================
-- Used for updating flag tables below.
-- Important to get the date at beginning
-- because not to miss any data for next run
--==========================================
set prStartDate = GETDATE();
--==========================================
-- The last run date of procedure
--==========================================
set prLastRunDatetime = (select ETLDate
from Flag.DimNewDataFlag
where 1=1
and DWDataset = 'DatasetName'
and DWTable = 'DestinationTableName');
--==========================================
-- The updated partitions of base table
-- since the last run date of procedure
--==========================================
set prPartitionListtoScan = (select ARRAY_AGG(distinct PL)
from Flag.TablePartitionsFlag
,unnest(PartitionList) PL
where 1=1
and DWDataset = 'DatasetName'
and DWTable = 'BaseTableName'
and ETLDate >= prLastRunDatetime);
--==========================================
-- Main algorithm, only lasted data
-- is taken into account
--==========================================
create temp table TemptoUpdate
partition by PartitionColumn as
select
... -- Calculation algorithm
,PartitionColumn
from DatasetName.BaseTableName
where 1=1
and UpdateDate >= prLastRunDatetime
-- UpdateDate is automatic filled date for every record update
and PartitionColumn in unnest(prPartitionListtoScan);
-- Only related partitions are scanned.
--==========================================
-- Only updated partitions are scanned
-- into merge operation
--==========================================
set prPartitionFilter = (SELECT ARRAY_AGG(distinct PartitionColumn)
FROM TemptoUpdate);
Merge into DatasetName.DestinationTableName main
using TemptoUpdate tmp on main.TableId = tmp.TableId
AND main.PartitionColumn in UNNEST(prPartitionFilter)
-- Only related partitions are scanned.
when matched then ...
--==========================================
-- Flag update for next run
--==========================================
MERGE INTO Flag.DimNewDataFlag main
USING (SELECT 'DatasetName' as DWDataset
,'DestinationTableName' as DWTable
,prStartDate as ETLDate ) tmp
on main.DWDataset = tmp.DWDataset
and main.DWTable = tmp.DWTable
WHEN MATCHED THEN
UPDATE SET main.ETLDate = tmp.ETLDate
WHEN NOT MATCHED THEN
INSERT (DWDataset,DWTable,ETLDate)
VALUES (tmp.DWDataset,tmp.DWTable,tmp.ETLDate);
--==========================================
-- Updated partitions log
--==========================================
insert into Flag.TablePartitionsFlag
SELECT * FROM (
SELECT
'DatasetName' as DWDataset
,'DestinationTableName' as DWTable
,prStartDate as ETLDate
,prPartitionFilter AS PartitionList)
where PartitionList is not null;
end;
In summary, the magic words “PartitionColumn in UNNEST(prPartitionFilter)” give us the opportunity to work with only necessary partitions of a table which are affected by new & updated data that came from the source, for both data update and data calculation processes. Moreover, calculation procedures which are dependent on the base table also become independent thanks to flag mechanism.