Çicek Sepeti Data Warehouse Architecture in Google BigQuery
www.ciceksepeti.com is an e-commercial website which has various data layers. These individual data structures are stored in independent micro databases. Hence, the necessity of analyzing data located in different databases gives the Business Intelligence team an important responsibility. Data warehouse, built by BI team, takes the role of not only analyzing and reporting data but also handling complex calculations and feeding the source systems.
Apart from reporting duty, other liabilities expose the requirement of updating and calculating data as frequently as possible. Thus, our challenge was to build an optimized, fast data warehouse in BigQuery with low cost.
By the way, we have first designed our data warehouse into Microsoft SQL Server. Table — view — procedure structure is pretty much the same but did some major changes about collecting and updating data in BigQuery. If you are interested in data warehouse migration steps, please read our harsh, challenging but instructive journey.
In this article, I will go into detail about:
- Data warehouse structure
- Collecting data
- Processing data
- Feeding outer systems
Data Warehouse Structure:
The Business Intelligence team works closely with the Data Engineering team about collecting data from the source. General flow is:
- Source data is streamed into Google Cloud Storage
- Data is transferred from GCS to BigQuery
- In BigQuery, data is modeled, calculated and transformed
- The output is either sent to source databases through API gateways or reported in Power BI which is also designed by BI team.
We first create an external table to reach data in GCS, then transform and get only needed data to update through external view. In the merge procedure, we update our base table in an optimized way. In that phase, we also assign data either dimension (definition data), fact (transactional data) or historical (definition data change over time) tables, having clusters and partitions if needed.
We have some own structural rules, such as using a table in any script is forbidden! Every table has their view definitions to be used. They help us to intervene in the pipeline without any structural change on the table.
These base views are used for processing some complex calculations and creating KPI’s, Power BI report views and GCS views which are needed to export data into GCS.
Moreover, we organize tables, procedures and views into datasets in BigQuery. We give source database names for datasets to understand which table data comes from. For KPI tables, views and procedures, we keep them into separate datasets having the related project name.
Collecting Data:
The Data Engineering team creates the streaming pipelines starting with the CDC connector which is the data change log of a table. Using Debezium, CDC logs are transferred from the source to Google Cloud Storage (GCS) in avro format. Sometimes, we cannot integrate log-based CDC of data because of reducing source database performance. If happens, data is streamed using JDBC connector which is relatively primitive and easy method. If you want to go deep in detail, please read this article.
In GCS, avro files are organized in date & hour folders with respect to current time. These folders will help to update data in BigQuery more optimized way.
We select the data by creating an external table. Actually, we can create an external table automatically using Postman, but it is not the subject of this article. The main contribution of BI team is in the external view:
- JSON data is parsed, the necessary columns are selected.
- CDC stores all change logs of an Id. We only select the latest change using RowNumber
- When you run a select script to an external table without any filter, all avro files are scanned and it causes an extra cost in pay-as-you-go method. We know that the old avro files do not need to be included because they are already used before. Hence, we store last updated avro files information into our flag table Flag.MergedTopics (FilePath and FileName). FilePath holds the GCS file path of the last avro file that is used for data update. FileName holds that avro files. When we use these informations into external view as a filter of external table, only the assigned and newer file paths are scanned, excluding the avro files which are already used.
SELECT … FROM External...
WHERE _FILE_NAME >= (select max(FilePath) from Flag.MergedTopics) and _FILE_NAME NOT IN (select FileName from Flag.MergedTopics)
- Depending on the data size, we minimize both the scan time and cost of an external table down to %90.
An example script is below:
create view ExtDataset.STBranchView as
SELECT *
EXCEPT (RowNumber)
FROM (
Select
stb.PKBranchId AS BranchId
... all other needed columns and transformations
,_FILE_NAME AS FileName
,CASE WHEN op = 'd' THEN TRUE ELSE FALSE END AS IsDeleted
,DW.GETDATE() AS ETLDate
,ROW_NUMBER()
OVER(PARTITION BY PKBranchId
ORDER BY stb.Timest DESC,TimestSort ASC) AS RowNumber
FROM (select *
,_FILE_NAME
,case when op = 'd' then before.PKBranchId
else after.PKBranchId end as PKBranchId
,case when op = 'd' then before.CreatedOn
else after.CreatedOn end as CreatedOn
,case when op = 'd' then before.Timest
else after.Timest end as Timest
,case when op = 'd' then 1 when op = 'c' then 3
else 2 end as TimestSort
from ciceksepeti-project.ExtDataset.STBranch
WHERE _FILE_NAME >= (select max(FilePath)
from Flag.MergedTopics)
and _FILE_NAME NOT IN (select FileName
from Flag.MergedTopics)
) stb
) sub
WHERE sub.RowNumber = 1
Now it’s time to update our main table (ciceksepeti-project.Dataset.DimBranch) using the external view. Our data update procedure phases are to:
- store external view data into a temporary table
- merge temporary table with the destination table
- update flag tables for next run
We managed to gather only required data in the external view, but when we use it to merge with the main table, this time the destination table will be fully scanned. “PartitionColumn in UNNEST(prPartitionFilter)” filter in the merge script helps us to scan only the partitions to be updated. Depending on the data size of the destination table, we again minimize the scan cost %90.
Example script is below:
CREATE PROCEDURE `ciceksepeti-project.Dataset.spDimBranch`()
BEGIN
DECLARE prPartitionFilter ARRAY<DATE>;
DECLARE prStartDate Timestamp;
DECLARE prEndDate Timestamp;
DECLARE prBytesBilled INT64;
DECLARE DWDataset string;
DECLARE DWTable string;
DECLARE prExtTable string;
DECLARE prProcedureName string;
-- Parameters for Log tables
SET prStartDate = (SELECT GETDATE());
SET DWDataset = 'Dataset';
SET DWTable = 'DimBranch';
SET prProcedureName = 'spDimBranch';
SET prExtTable = 'STBranch';
-- Storing external view data to temporary table (there may be new records uploading to GCS during the merge process below)
CREATE TEMP TABLE TmpExternalData
PARTITION BY PartitionColumn
CLUSTER BY BranchId
AS
SELECT *
FROM `ciceksepeti-project.ExtDataset.STBranchView`;
-- Partitions to be filtered
SET prPartitionFilter = (SELECT ARRAY_AGG(distinct PartitionColumn)
as PartitionFilter FROM TmpExternalData);
MERGE INTO ciceksepeti-project.Dataset.DimBranch main
USING TmpExternalData tmp on main.ProductId = tmp.ProductId
AND main.PartitionColumn in UNNEST(prPartitionFilter)
WHEN MATCHED and tmp.IsDeleted = TRUE and tmp.Timest >= main.Timest THEN DELETE
WHEN MATCHED and tmp.IsDeleted = FALSE and tmp.Timest >= main.Timest
THEN
UPDATE SET
...
WHEN NOT MATCHED and tmp.IsDeleted = FALSE
THEN
INSERT (...)
VALUES (...);
-- Scanned avro file names are stored
CREATE OR REPLACE TABLE ciceksepeti-project.Flag.DatasetFileNameList AS
SELECT DISTINCT DWDataset,FileName
FROM TmpExternalData
UNION ALL
SELECT DWDataset,DWDataset;
SET prEndDate = (SELECT `ciceksepeti-project`.DW.GETDATE());
SET prBytesBilled = (SELECT @@script.bytes_billed);
call `ciceksepeti-project.DW.spPostMergeOperations`(DWDataset, DWTable, prProcedureName, prBytesBilled, prStartDate, prEndDate, prPartitionFilter, prExtTable);
END;
For detailed implementation, please visit our data update optimization solution.
In this way, every table data can be updated as individual pipelines and independent of each other. No matter how frequently we trigger the procedure to update data, the total cost will not be higher than triggering it once a day. Because only the necessary avro files will be scanned as well as only required partitions of the destination table will charge the cost on every run.
Processing Data:
Çiçek Sepeti BI Team is responsible for calculating many different KPI’s. These outputs are used not only for reporting but also feeding the source systems. We define our calculation method as “continue where you left off”. While we managed to only integrate new & updated data from the external table to BigQuery base table, we can also calculate only required data for a KPI. This time, we have two different flag table:
- DimNewDataFlag, stores the latest calculation start time for a KPI
- TablePartitionsFlag, stores the partition update time for every table
Let’s say we have a table ciceksepeti-project.Dataset.FactCallStatus which uses the data of base table ciceksepeti-project.Dataset.FactCalls. It has complex calculations and detects the customer service call status that needed to take actions. Calculation period is once in an hour. Let’s assign the last calculation time as “2022–06–21 13:00:00”. So, it should be calculated again at 14:00. When it is triggered, DimNewDataFlag will give the information of the last process time which is 13:00.
-- Latest update date of FactCallStatus
set prLastRunDatetime = (select ETLDate
from DimNewDataFlag
where 1=1
and DWDataset = Dataset
and DWTable = 'FactCallStatus');
Our requirement is to detect only the updated partitions of base table ciceksepeti-project.Dataset.FactCalls since “2022–06–21 13:00:00”. We know that other partitions do not have any updated data and do not need to be included into calculation.
-- Updated partitions of base table since latest update date of FactCallStatus
set prCallPartitionList = (select ARRAY_AGG(distinct PL)
from TablePartitionsFlag
,unnest(PartitionList) PL
where 1=1
and DWDataset = 'Dataset'
and DWTable = 'FactCalls'
and ETLDate >= prLastRunDatetime);
If there is any updated partition for the base table, then the main algorithm is triggered. When it happens, only the chosen partitions will be scanned into the calculation algorithm with duration and cost optimization up to 90% (of course the rate depends on the data size and the complexity of calculation). If not, it is bypassed to avoid unnecessary table scan cost.
--=======================
-- Algorithm
--=======================
set prRowCount = (select count(*) from prCallPartitionList );-- Calculation is triggered if there is any data to calculate
if prRowCount > 0
thencreate temp table TemptoUpdate
partition by PartitionColumn
as
select
... -- KPI calculation
from `ciceksepeti-project`.Dataset.FactCallsView
where 1=1
and ETLDate >= prLastRunDatetime
and PartitionColumn in unnest(prCallPartitionList);
set prPartitionFilter = (SELECT ARRAY_AGG(distinct PartitionColumn)
as PartitionFilter FROM TemptoUpdate);
Merge into `ciceksepeti-project`.Dataset.FactCallStatus main
using TemptoUpdate tmp on main.Id = tmp.Id
AND main.PartitionColumn in UNNEST(prPartitionFilter)
when matched
then
Update set
...end if;
When the calculation is completed, log tables are updated (2022–06–21 13:00:00 to 2022–06–21 14:00:00).
--=======================
-- Logs
--=======================
-- Update last run date of FactCallStatus
call `ciceksepeti-project.Dataset.spDimNewDataFlagDeleteInsert`
('Dataset', 'FactCallStatus', prStartDate);
-- Partition Log
insert into `ciceksepeti-project`.Dataset.TablePartitionsFlag
SELECT * FROM (
SELECT
'Dataset' as DWDataset
,'FactCallStatus' as DWTable
,prStartDate as ETLDate
,prPartitionFilter AS PartitionList)
where PartitionList is not null;
Unlike Microsoft SQL, BigQuery does not have a lock mechanism for a table when reading or writing data. Hence, our “Collecting Data” and “Processing Data” phases can run independently thanks to our flag architecture even though they seem to be dependent on each other.
Feeding:
Data warehouse outputs are used for:
- reporting
- as a source of Data Science team
- feeding source databases using GCS and API
In our reporting phase, Power BI datasets directly pull data from Power BI views in BigQuery. We sometimes apply procedure — table — view method instead of only view to improve performance.
Furthermore, Power BI incremental refresh feature helps us trigger only required partitions to process. That’s important from a BigQuery perspective, because every time Power BI dataset is processed affects BigQuery cost. If Power BI partitions are integrated with BigQuery table partitions, data select cost decreases a great deal.
Data Science team also gathers data from the data warehouse, including both base tables and calculated KPI’s. To do that, BI team simply creates views into a data science project that directly selects only needed columns from data warehouse project views.
Moreover, the data science team has some transactional projects that need to select data continuously. BigQuery is not a convenient environment for that requirement. In this case, BI team exports (again only new & updated) data from Bigquery to GCS and then, MySQL database.
There are some KPI’s not only for reporting but also affects the operation directly. For these cases, outputs should be accurate at most and updated much more frequently.
We do not let other systems directly access BigQuery except for Power BI, because it is the internal environment of BI team. Besides, we export the calculated data to GCS then trigger an API endpoint that pushes data to destination systems such as Order, Product, Microsoft Axapta etc.
Last but not least:
There is some confidential data, such as customer information, that should not be accessed by anyone apart from certain cases. Hence, we separated everything, GCS buckets and BigQuery projects, having limited access only for certain developers. But sometimes, we may need the uniqueness of confidential data, not the content itself. So we create a footprint version of a confidential column that has standart access and use it in the analysis.
Of course, our data warehouse environment has many more features, optimized solutions for specific problems, query performance and cost tracking interfaces and so on. However, I tried to keep this article as simple as possible. Thanks for reading this far.
Contributions:
First of all, special thanks to Data Engineering team who always support us.
Dream team: Oyku Yesildag, Furkan Yusuf Pek, Songul Durmus, Irfan Dara, Ali Sarı, Eneeskalkan, Kaan Kerem Ilgaz and Umut Önder
We would be blind without our analyists: Selin Ertem, Yaren Gök
Our Product Manager who creates time for us to keep optimizing: Caner Kırcaklı