Load Data from AWS RDS MySQL into Google BigQuery

Data Processing in the Cloud

One of our clients, a Big Data Analytics firm, queries gigabytes worth of data regularly, and as such, performance impacts the productivity of their team. These queries run non-stop for hours with days of iterations between business logic. Our client wanted us to migrate their data from a relational database, specifically MySQL to a columnar database solution. Technically, RDBS like MySQL performs without issues for transactional operations, but there are some impediments when you’re performing real-time analytics operations on a large number of data. It’s where Google BigQuery outshines similar solutions in the marketplace because of its superior results in areas like cost (for all analytical scenarios), performance, and usability, especially at scale.

Both Amazon RedShift and Google BigQuery provide much of the same functionalities, there are some fundamental differences between how these two operate. So you need to pick the right solution based on your data and business.

The delivery team had to clone data from AWS RDS MySQL to Google BigQuery when we decided which data warehouse is best suited for the use case. We will outline how you can create a data pipeline to achieve the cloning effort between the two systems.

To begin, we used the AWS Data Pipeline to export the MySQL data before feeding it into Google BigQuery. The entire workflow is in the image below:

AWS RDS and Google BigQuery

Predefined period and schedule trigger the pipeline; it deploys a spot instance automatically that copies data from RDS MySQL database to CSV files split by table name to an Amazon S3 Bucket, then it shoots out an Amazon SNS notification on completion. On completion, the generated CSV files become accessible in our S3 bucket:

Data AWS S3 Bucket

Likewise, the SNS notification triggers a Lambda function which deploys a batch job using a Docker image stored in our private Docker registry. The Docker container is responsible for uploading the CSV files to GCS and ingesting the data into BigQuery from the S3 Bucket.

You can use Storage Transfer Service to easily migrate your data from Amazon S3 to Cloud Storage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/bin/bash

echo "πŸ€–: Downloading BigQuery Credentials..."

aws s3 cp s3://$GCP_AUTH_BUCKET/auth.json .

echo "πŸ€–: Uploading CSV to GCS..."

mkdir -p csv

rm tables

for raw in $(aws s3 ls s3://$S3_BUCKET/ | awk -F " " '{print $2}');
do
    table=${raw%/}
    if [[ $table != "" && $table != df* ]]
    then
        echo "πŸ€–: Table: $table"
        csv=$(aws s3 ls s3://$S3_BUCKET/$table/ | awk -F " " '{print $4}' | grep ^ | sort -r | head -n1)

        echo "πŸ€–: $table >> tables"

        echo "πŸ€–: CSV: $csv"

        echo "πŸ€–: Copy csv from S3"
        aws s3 cp s3://$S3_BUCKET/$table/$csv csv/$table.csv

        echo "πŸ€–: Uploading CSV to GCP"
        gsutil cp csv/$table.csv gs://$GS_BUCKET/$table.csv
    fi
done

echo "πŸ€–: Importing CSV to Google BigQuery"

python bigquery_importer.py

Our team wrote a nifty Python script to clean up the raw data (encoding issues), map MySQL data types to BigQuery data types (data transformation), and load the CSV file to Google BigQuery:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import mysql.connector
import os
import time
from mysql.connector import Error
from google.cloud import bigquery

bigquery_client = bigquery.Client()

def mapToBigQueryDataType(columnType):
    if columnType.startswith('int'):
        return 'INT64'
    if columnType.startswith('varchar'):
        return 'STRING'
    if columnType.startswith('decimal'):
        return 'FLOAT64'
    if columnType.startswith('datetime'):
        return 'DATETIME'
    if columnType.startswith('text'):
        return 'STRING'
    if columnType.startswith('date'):
        return 'DATE'
    if columnType.startswith('time'):
        return 'TIME'

def wait_for_job(job):
    while True:
        job.reload()
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
        time.sleep(1)

try:
    conn = mysql.connector.connect(host=os.environ['MYSQL_HOST'],
                                database=os.environ['MYSQL_DB'],
                                user=os.environ['MYSQL_USER'],
                                password=os.environ['MYSQL_PWD'])
    if conn.is_connected():
            print('πŸ€–: I am connected to the MySQL database.')

    lines = open('tables').read().split("\n")
    for tableName in lines:
        print('Table:',tableName)
            
        cursor = conn.cursor()
        cursor.execute('SHOW FIELDS FROM '+os.environ['MYSQL_DB']+'.'+tableName)
            
        rows = cursor.fetchall()

        schema = []
        for row in rows:
            schema.append(bigquery.SchemaField(row[0].replace('\'', ''), mapToBigQueryDataType(row[1])))

        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.CSV
        job_config.autodetect = True
        job_config.max_bad_records = 2
        job_config.allow_quoted_newlines = True
        job_config.schema = schema

        job = bigquery_client.load_table_from_uri(
                'gs://'+os.environ['GCE_BUCKET']+'/'+tableName+'.csv',
                bigquery_client.dataset(os.environ['BQ_DATASET']).table(tableName),
                location=os.environ['BQ_LOCATION'],
                job_config=job_config)

        print('πŸ€–: Loading data to BigQuery:', tableName)

        wait_for_job(job)

        print('πŸ€–: Loaded {} rows into {}:{}.'.format(
        job.output_rows, os.environ['BQ_DATASET'], tableName))
        
except Error as e:
    print(e)
finally:
    conn.close()

Below is an image of the imported tables in Google BigQuery:

Google BigQuery

The solution worked, but we wanted to take it one step further. Fortunately for us, Google Cloud announced the public beta release of BigQuery Data Transfer (BQDT), it helps you to automate the movement of data from multiple data endpoints like GCS or S3 to BigQuery per your specified schedule. It proved to be the right fit for in this instance to test the recurring import batch jobs from Amazon S3 Bucket into Google BigQuery.

BigQuery Data Transfer

BigQuery Data Transfer (BQDT) compliments Google BigQuery because BigQuery lacks the functionality to create tables during the data transfer process. We had to write a Lambda function to drop the old dataset, create destination tables and the appropriate schema in advance before dispatching the transfer. The Lambda function is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func handler(ctx context.Context) error {
	client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID"))
	if err != nil {
		return err
	}

	err = RemoveDataSet(client)
	if err != nil {
		return err
	}

	err = CreateDataSet(client)
	if err != nil {
		return err
	}

	uri := fmt.Sprintf("%s:%s@tcp(%s)/%s",
		os.Getenv("MYSQL_USERNAME"), os.Getenv("MYSQL_PASSWORD"),
		os.Getenv("MYSQL_HOST"), os.Getenv("MYSQL_DATABASE"))

	db, err := sql.Open("mysql", uri)
	if err != nil {
		return err
	}

	file, err := os.Open("tables")
	if err != nil {
		return err
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		tableName := scanner.Text()
		fmt.Println("Table:", tableName)

		columns, _ := GetColumns(tableName, db)
		fmt.Println("Columns:", columns)

		CreateBQTable(tableName, columns, client)
	}

	if err := scanner.Err(); err != nil {
		return err
	}
	return nil
}

func main() {
	lambda.Start(handler)
}

As a token, a CloudWatch Event triggers the Lambda Function once the data pipeline completes exporting the CSV files:

CloudWatch - Lambda Function

To load data from AWS S3 Bucket to Google BigQuery table, we created transfer jobs for each table in BigQuery.

BigQuery Transfer Jobs

Google BigQuery can store hundreds of gigabytes (hopefully, terabytes soon) of data with the capacity to analyse in split seconds gives stakeholders business intelligence and data-driven insights.

Enjoyed what you read or learned something new? You can send us a message, feedback, or suggestions β€” or, let us know how we can help your organisation build, secure, deploy and manage production-ready Serverless applications with AWS Lambda using Golang and Python.