Source: Citus Data Blog

Citus Data Blog Scaling Out MySQL with PostgreSQL and Citus

Scaling Out MySQL with PostgreSQL and Citus PostgreSQL is known for its great extensibility and powerful plugins. One particular category of extensions is Foreign Data Wrappers or FDWs. FDWs allow us to interact from within Postgres directly with other data stores such as hdfs, columnar stores, mysql, etc. Combined with Citus' scalability features, we can even leverage them to help us scale out those data stores where might otherwise be quite difficult.Imagine having you have a very large and growing table in MySQL on which queries are taking and longer and longer. Fortunately, Citus can solve your problems with the help of mysql_fdw. Before we get to the meat of it, we would like to thank Eugen Konkov for his interesting question on StackOverflow and inspiring us. Notice that, this tutorial is an experimental work so feel free to try this at home, but use caution before advancing it to production.In this blog post we will see how PostgreSQL and Citus can help us to scale out existing MySQL data. For this we will do the following:Create a MySQL table and fill it with some data.Partition the MySQL table into smaller MySQL tables.Create a distributed table in PostgreSQL with Citus.Connect the distributed table shards to the corresponding MySQL tables.Run a query on the master Citus node and see that it correctly fetches the data from MySQL tables in parallel.The architecture that we will work on will look like this: First we assume that you have MySQL, PostgreSQL, Citus and mysql_fdw installed. For this demo, we will use LINEITEM table, from the standard TPC-H benchmarks. Let's create it in MySQL and fill it with some data: mysql> CREATE TABLE LINEITEM ( L_ORDERKEY INTEGER NOT NULL, L_PARTKEY INTEGER NOT NULL, L_SUPPKEY INTEGER NOT NULL, L_LINENUMBER INTEGER NOT NULL, L_QUANTITY DOUBLE PRECISION NOT NULL, L_EXTENDEDPRICE DOUBLE PRECISION NOT NULL, L_DISCOUNT DOUBLE PRECISION NOT NULL, L_TAX DOUBLE PRECISION NOT NULL, L_RETURNFLAG CHAR(1) NOT NULL, L_LINESTATUS CHAR(1) NOT NULL, L_SHIPDATE DATE NOT NULL, L_COMMITDATE DATE NOT NULL, L_RECEIPTDATE DATE NOT NULL, L_SHIPINSTRUCT CHAR(25) NOT NULL, L_SHIPMODE CHAR(10) NOT NULL, L_COMMENT VARCHAR(44) NOT NULL); mysql> LOAD DATA LOCAL INFILE 'tpch_2_13_0/lineitem.tbl' INTO TABLE LINEITEM FIELDS TERMINATED BY '|';Note: To create lineitem.tbl, download the TPC-H bundle and use the dbgen tool. For this demo, we have generated the data with scale factor 10:./dbgen -f -s 10 -T LNow let's get some information about the data we have:mysql> SELECT count(*) FROM LINEITEM; +----------+ | count(*) | +----------+ | 59986052 | +----------+We now have about 60 million tuples and partitioning this data according to shipdate ranges sounds nice. Let's see which dates should we pick as range limits with the following simple cumulative counting query:SET @running_total:=0; SELECT totals.shipdate as shipdate, (@running_total := @running_total + totals.shipcount) as cumulative_count FROM (SELECT l_shipdate AS shipdate, count(*) AS shipcount FROM LINEITEM GROUP BY shipdate ORDER BY shipdate) AS totals;From the query, we can pick 1993-04-07, 1994-05-13, 1995-06-18, 1996-07-23, 1997-08-28. Let's create our partitions:$ mysql CREATE TABLE LINEITEM_1 AS SELECT * FROM LINEITEM WHERE l_shipdate < '1993-04-07'; CREATE TABLE LINEITEM_2 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1993-04-07' AND l_shipdate < '1994-05-13'; CREATE TABLE LINEITEM_3 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1994-05-13' AND l_shipdate < '1995-06-18'; CREATE TABLE LINEITEM_4 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1995-06-18' AND l_shipdate < '1996-07-23'; CREATE TABLE LINEITEM_5 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1996-07-23' AND l_shipdate < '1997-08-28'; CREATE TABLE LINEITEM_6 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1997-08-28';We have partitioned our data in MySQL. For this demonstration, we are using a single MySQL server, but we can place the partitions on other MySQL servers as well. Now, let's head to Citus and create our distributed table.First, let's tell postgres which foreign server will we use with mysql_fdw and also tell it how the current role will use this foreign server :$ psql CREATE SERVER mysql_svr FOREIGN DATA WRAPPER mysql_fdw OPTIONS (host '127.0.0.1', port '3306'); CREATE USER MAPPING FOR eren SERVER mysql_svr OPTIONS (username 'root', password 'toor');Then create our distributed foreign table for LINEITEM:CREATE FOREIGN TABLE LINEITEM ( L_ORDERKEY INTEGER NOT NULL, L_PARTKEY INTEGER NOT NULL, L_SUPPKEY INTEGER NOT NULL, L_LINENUMBER INTEGER NOT NULL, L_QUANTITY DOUBLE PRECISION NOT NULL, L_EXTENDEDPRICE DOUBLE PRECISION NOT NULL, L_DISCOUNT DOUBLE PRECISION NOT NULL, L_TAX DOUBLE PRECISION NOT NULL, L_RETURNFLAG CHAR(1) NOT NULL, L_LINESTATUS CHAR(1) NOT NULL, L_SHIPDATE DATE NOT NULL, L_COMMITDATE DATE NOT NULL, L_RECEIPTDATE DATE NOT NULL, L_SHIPINSTRUCT CHAR(25) NOT NULL, L_SHIPMODE CHAR(10) NOT NULL, L_COMMENT VARCHAR(44) NOT NULL) SERVER mysql_svr OPTIONS (dbname 'test', table_name 'LINEITEM'); SELECT master_create_distributed_table('LINEITEM', 'l_shipdate', 'range');Now, let's create 6 shards that will correspond to the 6 MySQL tables:SET citus.shard_replication_factor TO 1; CREATE OR REPLACE FUNCTION master_create_range_shard( table_name text, minvalue text, maxvalue text) RETURNS void LANGUAGE plpgsql AS $function$ DECLARE new_shard_id bigint; BEGIN SELECT master_create_empty_shard(table_name) INTO new_shard_id; UPDATE pg_dist_shard SET shardminvalue = minvalue, shardmaxvalue = maxvalue WHERE shardid = new_shard_id; END; $function$; SELECT master_create_range_shard('LINEITEM', '1992-01-02', '1993-04-06'); SELECT master_create_range_shard('LINEITEM', '1993-04-07', '1994-05-12'); SELECT master_create_range_shard('LINEITEM', '1994-05-13', '1995-06-17'); SELECT master_create_range_shard('LINEITEM', '1995-06-18', '1996-07-22'); SELECT master_create_range_shard('LINEITEM', '1996-07-23', '1997-08-27'); SELECT master_create_range_shard('LINEITEM', '1997-08-28', '1998-12-01');Let's see our shards:SELECT * FROM pg_dist_shard WHERE logicalrelid='LINEITEM'::regclass; logicalrelid | shardid | shardstorage | shardalias | shardminvalue | shardmaxvalue --------------+---------+--------------+------------+---------------+--------------- 16468 | 102008 | f | | 1992-01-02 | 1993-04-06 16468 | 102009 | f | | 1993-04-07 | 1994-05-12 16468 | 102010 | f | | 1994-05-13 | 1995-06-17 16468 | 102011 | f | | 1995-06-18 | 1996-07-22 16468 | 102012 | f | | 1996-07-23 | 1997-08-27 16468 | 102013 | f | | 1997-08-28 | 1998-12-01 (6 rows)We have finished configuring the master. We need to connect to workers set which shard is associated with which mysql partition table. Let's run the commands at workers:ALTER TABLE lineitem_102008 OPTIONS (SET table_name 'LINEITEM_1'); ALTER TABLE lineitem_102009 OPTIONS (SET table_name 'LINEITEM_2'); ALTER TABLE lineitem_102010 OPTIONS (SET table_name 'LINEITEM_3'); ALTER TABLE lineitem_102011 OPTIONS (SET table_name 'LINEITEM_4'); ALTER TABLE lineitem_102012 OPTIONS (SET table_name 'LINEITEM_5'); ALTER TABLE lineitem_102013 OPTIONS (SET table_name 'LINEITEM_6');We also need to define the user mappings for shard foreign servers. Let's run the user mapping creation commands at workers:CREATE USER MAPPING FOR eren SERVER mysql_svr_102008 OPTIONS (username 'root', password 'toor'); CREATE USER MAPPING FOR eren SERVER mysql_svr_102009 OPTIONS (username 'root', password 'toor'); CREATE USER MAPPING FOR eren SERVER mysql_svr_102010 OPTIONS (username 'root', password 'toor'); CREATE USER MAPPING FOR eren SERVER mysql_svr_102011 OPTIONS (username 'root', password 'toor'); CREATE USER MAPPING FOR eren SERVER mysql_svr_102012 OPTIONS (username 'root', password 'toor'); CREATE USER MAPPING FOR eren SERVER mysql_svr_102013 OPTIONS (username 'root', password 'toor');Now we're ready to see things in action. Let's reconnect to the master and run our distributed query:mysql> SELECT L_SHIPMODE, COUNT(*) FROM LINEITEM WHERE L_PARTKEY > 100 AND L_PARTKEY < 150 GROUP BY L_SHIPMODE; +------------+----------+ | L_SHIPMODE | COUNT(*) | +------------+----------+ | AIR | 197 | | FOB | 203 | | MAIL | 213 | | RAIL | 228 | | REG AIR | 222 | | SHIP | 197 | | TRUCK | 215 | +------------+----------+ 7 rows in set (1 min 27.44 sec) postgres=# SELECT L_SHIPMODE, COUNT(*) FROM LINEITEM WHERE L_PARTKEY > 100 AND L_PARTKEY < 150 GROUP BY L_SHIPMODE; l_shipmode | cou

Read full article »
Est. Annual Revenue
$100K-5.0M
Est. Employees
25-100
Umur Cubukcu's photo - Co-Founder & CEO of Citus Data

Co-Founder & CEO

Umur Cubukcu

CEO Approval Rating

83/100

Read more