Tuesday, February 21, 2012

Set operations in Apache Pig


Simple set operation examples

While writing Apache Pig scripts, I realized that in many cases the result I was after was attainable through a series of set operations performed on various relations. It’s not very clear from the documentation how to perform these operations. I googled a bit and found this PDF from Tufts University on ‘Advanced Pig’. In a nutshell, COGROUP is your friend. Here are some simple examples that show how you can perform set operations in Pig using COGROUP.

Let’s assume we have 2 relations TEST1 and TEST2. We load TEST1 from test1.txt containing:


1,aaaa
2,bbbb
3,cccc
4,dddd
5,eeee
6,ffff
7,gggg
8,hhhh
9,iiii

TEST1 = LOAD 's3://mybucket/test1.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);

We load TEST2 from test2.txt containing:

7,ggggggg
8,hhhhhhh
9,iiiiiii
10,jjjjjjj
11,kkkkkkk

TEST2 = LOAD 's3://mybucket/test2.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);


We use COGROUP to generate a new relation. COGROUP is similar to JOIN, in that it takes one or more fields of each of its member relations. Here is how we cogroup based on the id field of TEST1 and also id of TEST2:

CGRP = COGROUP TEST1 BY id, TEST2 BY id;
DUMP CGRP;

(1,{(1,aaaa)},{})
(2,{(2,bbbb)},{})
(3,{(3,cccc)},{})
(4,{(4,dddd)},{})
(5,{(5,eeee)},{})
(6,{(6,ffff)},{})
(7,{(7,gggg)},{(7,ggggggg)})
(8,{(8,hhhh)},{(8,hhhhhhh)})
(9,{(9,iiii)},{(9,iiiiiii)})
(10,{},{(10,jjjjjjj)})
(11,{},{(11,kkkkkkk)})

If we DESCRIBE the new relation CGRP we get:

CGRP: {group: chararray,TEST1: {(id: chararray,value: chararray)},TEST2: {(id: chararray,value: chararray)}}

What is important to notice is that the second element of each tuple from the new relation is a bag of tuples from TEST1 containing the id value by which we cogrouped, and the third element of each tuple is a bag of tuples from TEST2 containing that same id value. These bags are empty if TEST1 or TEST2 do not contain a given id value. Based on this, we can perform the set operations I mentioned.

To perform set intersection (based on the id field), we only keep those tuples which have non-empty bags for both TEST and TEST2:


INTERSECT = FILTER CGRP BY NOT IsEmpty(TEST1) AND NOT IsEmpty(TEST2);
INTERSECT_ID = FOREACH INTERSECT GENERATE group AS id;   
DUMP INTERSECT_ID;

(7)
(8)
(9)


To perform the set difference TEST1 - TEST2 (based again on the id field), we keep only those tuples which have empty bags for TEST2 (which means those particular id values are in TEST1, but not in TEST2:

TEST1_MINUS_TEST2 = FILTER CGRP BY IsEmpty(TEST2);
TEST1_MINUS_TEST2_ID = FOREACH TEST1_MINUS_TEST2 GENERATE group AS id;
DUMP TEST1_MINUS_TEST2_ID;

(1)
(2)
(3)
(4)
(5)
(6)


The difference going the other way (TEST2 - TEST1) is similar. We keep only those tuples which have empty bags for TEST1:

TEST2_MINUS_TEST1 = FILTER CGRP BY IsEmpty(TEST1);
TEST2_MINUS_TEST1_ID = FOREACH TEST2_MINUS_TEST1 GENERATE group AS id;
DUMP TEST2_MINUS_TEST1_ID;

(10)
(11)


Note that if we wanted the set union based on the id field, we could simply generate the ‘group’ element of the CGRP relation:

UNION_ID = FOREACH CGRP GENERATE group AS id;
DUMP UNION_ID;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)


To perform the set intersection operation, we could also do a JOIN of TEST1 and TEST2 on the id field:

J = JOIN TEST1 BY id, TEST2 BY id;
DESCRIBE J;
DUMP J;

J: {TEST1::id: chararray,TEST1::value: chararray,TEST2::id: chararray,TEST2::value: chararray}

(7,gggg,7,ggggggg)
(8,hhhh,8,hhhhhhh)
(9,iiii,9,iiiiiii)


After the JOIN, we keep only the first field of the J relation (the id field):

J_ID = FOREACH J GENERATE $0;
DUMP J_ID;

(7)
(8)
(9)


To perform the set union operation, we could do a UNION of TEST1 and TEST2:

U = UNION TEST1, TEST2;
DESCRIBE U;
DUMP U;

U: {id: chararray,value: chararray}

(1,aaaa)
(2,bbbb)
(3,cccc)
(4,dddd)
(5,eeee)
(6,ffff)
(7,gggg)
(8,hhhh)
(9,iiii)
(7,ggggggg)
(8,hhhhhhh)
(9,iiiiiii)
(10,jjjjjjj)
(11,kkkkkkk)


However, note that the tuples containing common id values (7, 8 and 9) are duplicated at this point. So to generate the true set union, we need to keep only the distinct id values:

U_ID = FOREACH U GENERATE $0;
U_ID_DISTINCT = DISTINCT U_ID;
DUMP U_ID_DISTINCT;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)

More ‘real life’ set operation examples

The following examples slightly more realistic. At least they’re based on real data -- the GeoWordNet datasets. As stated in this “Background Knowledge Datasets” document:

“A geo-spatial ontology is an ontology consisting of geo-spatial classes (e.g. lake, city), entities (e.g., Lago di Molveno, Trento), their metadata (e.g. latitude and longitude coordinates) and relations between them (e.g., part-of, instance-of). GeoWordNet is a multilingual geo-spatial ontology built from the full integration of WordNet, GeoNames and the Italian part of MultiWordNet.”

The GeoWordNet dataset contains several CSV files which can be either imported in a relational database, or, in our case, loaded into Pig as relations:

concept = LOAD 's3://mybucket/geowordnet/concept.csv.gz' USING PigStorage(',') as (
 con_id: int,
 name: chararray,
 gloss:chararray,
 lang: chararray,
 provenance: chararray);

relation= LOAD 's3://mybucket/geowordnet/relation.csv.gz' USING PigStorage(',') as (
         src_con_id: int,
         trg_con_id: int,
         name: chararray,
         gloss:chararray,
         lang: chararray);

entity = LOAD 's3://mybucket/geowordnet/entity.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray,
         con_id: int,
         lang: chararray,
         latitude: chararray,
         longitude: chararray,
         provenance: chararray);

part_of = LOAD 's3://mybucket/geowordnet/part_of.csv.gz' USING PigStorage(',') as (
         src_entity_id: int,
         trg_entity_id: int);

alternative_name_eng = LOAD 's3://mybucket/geowordnet/alternative_name_eng.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);

alternative_name_ita = LOAD 's3://mybucket/geowordnet/alternative_name_ita.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);


Example 1

-- Find entities with both alternative english AND italian names
COGRP1 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
INTERSECT = FILTER COGRP1 BY NOT IsEmpty(alternative_name_eng) AND NOT IsEmpty(alternative_name_ita);
R1 = FOREACH INTERSECT GENERATE FLATTEN(alternative_name_eng), FLATTEN(alternative_name_ita);


Example 2

-- Find entities with alternative english names but with no alternative italian names
COGRP2 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
DIFF2 = FILTER COGRP2 BY IsEmpty(alternative_name_ita);
R2 = FOREACH DIFF2 GENERATE FLATTEN(alternative_name_eng);


Example 3

-- Find entities with alternative italian names but with no alternative english names
COGRP3 = COGROUP alternative_name_ita BY entity_id, alternative_name_eng BY entity_id;
DIFF3 = FILTER COGRP3 BY IsEmpty(alternative_name_eng);
R3 = FOREACH DIFF3 GENERATE FLATTEN(alternative_name_ita);

Example 4

-- Find entities with alternative english OR italian names
U = UNION alternative_name_eng, alternative_name_ita;
J = JOIN entity BY entity_id, U BY entity_id;
R4 = FOREACH J GENERATE entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude, U::name;

Example 5

-- Find entities with NO alternative english and NO italian names (by doing set difference)
COGRP5 = COGROUP entity BY entity_id, U BY entity_id;
DIFF5 = FILTER COGRP5 BY IsEmpty(U);
R5 = FOREACH DIFF5 GENERATE FLATTEN(entity);


Although not strictly set-operation-related, here are some more things you can find out from the GeoWordNet dataset by means of JOINs between the appropriate relations:

-- Find relations between concepts
J1 = JOIN concept BY con_id, relation BY src_con_id;
J2 = JOIN J1 by trg_con_id, concept by con_id;
R6 = FOREACH J2 GENERATE J1::concept::con_id, J1::concept::name, J1::concept::gloss, J1::concept::lang, J1::concept::provenance, J1::relation::src_con_id, J1::relation::trg_con_id, J1::relation::name, J1::relation::gloss, J1::relation::lang, concept::con_id, concept::name, concept::gloss, concept::lang, concept::provenance;

-- Find entities which are part of other entities
J3 = JOIN entity BY entity_id, part_of BY src_entity_id;
J4 = JOIN J3 by trg_entity_id, entity by entity_id;
R7 = FOREACH J4 GENERATE J3::entity::name, J3::entity::con_id, J3::entity::lang, J3::entity::latitude, J3::entity::longitude, 'is part of', entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude;


Thursday, February 09, 2012

Handling date/time in Apache Pig

A common usage scenario for Apache Pig is to analyze log files. Most log files contain a timestamp of some sort -- hence the need to handle dates and times in your Pig scripts. I'll present here a few techniques you can use.

Mail server logs

The first example I have is a Pig script which analyzes the time it takes for a mail server to send a message. The script is available here as a gist.

We start by registering the piggybank jar and defining the functions we'll need. I ran this using Elastic MapReduce, and all these functions are available in the piggybank that ships with EMR.

REGISTER file:/home/hadoop/lib/pig/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();             
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME();
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT();
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT();


Since the mail log timestamps don't contain the year, we declare a variable called YEAR which by default is set to the current year via the Unix 'date' command. The variable can also be set when the Pig script is called by running "pig -p YEAR=2011 mypigscript.pig".

%default YEAR `date +%Y`;

We read in the mail logs and extract the lines containing the source of a given message ('from' lines). An example of such a line:

Dec  2 15:13:52 mailserver1 sendmail[1882]: pB2KCqu1001882: from=<info@example.com>, size=9544, class=0, nrcpts=1, msgid=<201112022012.pB2KCqu1001882@mailserver1.example.com>, proto=ESMTP, daemon=MTA, relay=relay1.example.com [10.0.20.6]

To split the line into its various elements, we use the EXTRACT function and a complicated regular expression. Note that in Pig the backslash needs to be escaped:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
SRC = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+from=<([^>]+)>,\\s+size=(\\d+),\\s+class=(\\d+),\\s+nrcpts=(\\d+),\\s+msgid=<([^>]+)>.*relay=(\\S+)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 src: chararray,
 size: chararray,
 classnumber: chararray,
 nrcpts: chararray,
 msgid: chararray,
 relay: chararray
);

For this particular exercise we don't need all the fields of the SRC relation. We keep only a few:

T1 = FOREACH SRC GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp;
FILTER_T1 = FILTER T1 BY NOT sendmailid IS NULL;
DUMP FILTER_T1;

Note that we use the FORMAT function to generate a timestamp string out of the month, day and time fields, and we also add the YEAR variable. The FILTER_T1 relation contains tuples such as:

(pB2KDpaN007050,2011-Dec-2 15:13:52)
(pB2KDpaN007054,2011-Dec-2 15:13:53)
(pB2KDru1003569,2011-Dec-2 15:13:54)

We now use the DATE_TIME function which takes as input our generated timestamp and the date format string representing the timestamp ('yyyy-MMM-d HH:mm:ss'), and returns a DateTime string in Joda-Time format/ ISO 8601 format.

R1 = FOREACH FILTER_T1 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt;
DUMP R1;

The R1 relation contains tuples such as:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z)
(pB2KDru1003569,2011-12-02T15:13:54.000Z)

Note that the timestamp string "2011-Dec-2 15:13:52" got converted into a canonical ISO 8601 DateTime string "2011-12-02T15:13:52.000Z".

Now we can operate on the DateTime strings by using the ISOToUnix function, which takes a DateTime and returns the Unix epoch in milliseconds (which we divide by 1000 to obtain seconds):

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch1 = FOREACH R1 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 as epoch:long;
DUMP toEpoch1;

The toEpoch1 relation contains tuples of the form:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z,1322838832)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z,1322838833)
(pB2KDru1003569,2011-12-02T15:13:54.000Z,1322838834)

We now perform similar operations on lines containing destination email addresses:

DEST = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+to=<([^>]+)>,\\s+delay=([^,]+),\\s+xdelay=([^,]+),.*relay=(\\S+)\\s+\\[\\S+\\],\\s+dsn=\\S+,\\s+stat=(.*)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 dest: chararray,
 delay: chararray,
 xdelay: chararray,
 relay: chararray,
 stat: chararray
);


T2 = FOREACH DEST GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp, dest, stat;
FILTER_T2 = FILTER T2 BY NOT sendmailid IS NULL;

R2 = FOREACH FILTER_T2 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt, dest, stat;

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch2 = FOREACH R2 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 AS epoch:long, dest, stat;

At this point we have 2 relations, toEpoch1 and toEpoch2, which we can join by sendmailid:

R3 = JOIN toEpoch1 BY sendmailid, toEpoch2 BY sendmailid;

The relation R3 will contain tuples of the form

(sendmailid, datetime1, epoch1, sendmailid, datetime2, epoch2, dest, stat)

We generate another relation by keeping the sendmailid, the delta epoch2 - epoch1, the destination email and the status of the delivery. We also order by the epoch delta:

R4 = FOREACH R3 GENERATE $0, $5 - $2, $6, $7;
R5 = ORDER R4 BY $1 DESC;

R5 contains tuples such as:

(pB2KDqo5007488,2,user1@earthlink.net,Sent (1rwzuwyl3Nl36v0 Message accepted for delivery))
(pB2KDru1003560,1,user2@yahoo.com,Sent (ok dirdel))
(pB2KCrvm030964,0,user3@hotmail.com,Sent ( <201112022012.pB2KCrvm030964> Queued mail for delivery))

At this point we can see which email deliveries took longest, and try to identify patterns (maybe certain mail domains make it harder to deliver messages, or maybe email addresses are misspelled, etc).

Nginx logs

In the second example, I'll show how to do some date conversions on Nginx access log timestamps. The full Pig script is available here as a gist.

We parse the Nginx access log lines similarly to the mail log lines in the first example:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
LOGS_BASE = FOREACH RAW_LOGS GENERATE                                            
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+) - - \\[([^\\[]+)\\]\\s+"([^"]+)"\\s+(\\d+)\\s+(\\d+)\\s+"([^"]+)"\\s+"([^"]+)"\\s+"([^"]+)"\\s+(\\S+)')
)
AS (
 ip: chararray,
 timestamp: chararray,
 url: chararray,
 status: chararray,
 bytes: chararray,
 referrer: chararray,
 useragent: chararray,
 xfwd: chararray,
 reqtime: chararray
);
DATE_URL = FOREACH LOGS_BASE GENERATE timestamp;
F = FILTER DATE_URL BY NOT timestamp IS NULL;

The timestamp is of the form "30/Sep/2011:00:10:02 -0700" so we use the appropriate DATE_TIME formatting string 'dd/MMM/yyyy:HH:mm:ss Z' to convert it to an ISO DateTime. Note that we need to specify the timezone with Z:

R1 = FOREACH F GENERATE timestamp, DATE_TIME(timestamp, 'dd/MMM/yyyy:HH:mm:ss Z') as dt;
DUMP R1;

R1 contains tuples of the form:

(30/Sep/2011:00:19:35 -0700,2011-09-30T00:19:35.000-07:00)
(30/Sep/2011:00:19:36 -0700,2011-09-30T00:19:36.000-07:00)
(30/Sep/2011:00:19:37 -0700,2011-09-30T00:19:37.000-07:00)

At this point, if we wanted to convert from DateTime to Unix epoch in seconds, we could use ISOToUnix like we did for the mail logs:

toEpoch = FOREACH R1 GENERATE dt, ISOToUnix(dt) / 1000 as epoch:long;

However, let's use another function called FORMAT_DT to convert from the above DateTime format to another format of the type 'MM/dd/yyyy HH:mm:ss Z'. The first argument to FORMAT_DT is the desired format for the date/time, and the second argument is the original DateTime format:

FDT = FOREACH R1 GENERATE FORMAT_DT('MM/dd/yyyy HH:mm:ss Z', dt) as fdt;
DUMP FDT;

The FDT relation now contains tuples such as:

(09/30/2011 00:19:35 -0700)
(09/30/2011 00:19:36 -0700)
(09/30/2011 00:19:37 -0700)

We can now use a handy function called CustomFormatToISO to convert from any custom date/time format (such as the one we generated in FDT) back to a canonical ISO DateTime format:

toISO = FOREACH FDT GENERATE fdt, CustomFormatToISO(fdt, 'MM/dd/yyyy HH:mm:ss Z');
DUMP toISO;

(09/30/2011 00:19:35 -0700,2011-09-30T07:19:35.000Z)
(09/30/2011 00:19:36 -0700,2011-09-30T07:19:36.000Z)
(09/30/2011 00:19:37 -0700,2011-09-30T07:19:37.000Z)

Note how the custom DateTime string "09/30/2011 00:19:35 -0700" got transformed into the canonical ISO DateTime string "2011-09-30T07:19:35.000Z".

Converting Unix epoch to DateTime

Some log files have timestamps in Unix epoch format. If you want to transform them into DateTime, you can use the UnixToISO function:

DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();

Here is an input file:

$ cat unixtime.txt
1320777563
1320777763
1320779563
1320787563

And here is a Pig script which converts the epoch into DateTime strings. Note that UnixToISO expects the epoch in milliseconds, and our input is in seconds, so we have to multiply each input value by 1000 to get to milliseconds:

UNIXTIMES = LOAD 's3://mybucket.com/unixtime.txt' as (unixtime:long);
D = FOREACH UNIXTIMES GENERATE UnixToISO(unixtime * 1000);
DUMP D;

(2011-11-08T18:39:23.000Z)
(2011-11-08T18:42:43.000Z)
(2011-11-08T19:12:43.000Z)
(2011-11-08T21:26:03.000Z)

Tuesday, January 24, 2012

An ode to running a database on bare metal


No, my muse is not quite as strong as to inspire me to write an ode, but I still want to emphasize a few points about the goodness of running a database on bare metal.

At Evite, we use sharded MySQL for our production database. We designed the current architecture in 2009, when NoSQL was still very much in its infancy, so MySQL seemed a solid choice, a technology that we could at least understand. As I explained elsewhere, we do use MySQL in an almost non-relational way, and we sharded from the get-go, with the idea that it's better to scale horizontally than vertically.

We initially launched with the database hosted at a data center on a few Dell PE2970 servers, each with 16 GB of RAM and 2 quad-core CPUs. Each server was running 2 MySQL instances. We didn't get a chance to dark launch, but the initial load testing we did showed that we should be OK. However, there is nothing like production traffic to really stress test your infrastructure, and we soon realized that we have an insufficient number of servers for the peak traffic we were expecting towards the end of the year.

We decided to scale horizontally in EC2, with one MySQL instance per m1.xlarge EC2 instance. At the time we also engaged Percona and they helped us fine-tune our Percona XtraDB MySQL configuration so we could get the most out of the m1.xlarge horsepower. We managed to scale sufficiently enough for our high season in 2010, although we had plenty of pain points. We chose to use EBS volumes for our database files, because at the time EBS still gave people the illusion of stability and durability. We were very soon confronted with severe performance issues, manifested as very high CPU I/O wait times, which were sometimes so high as to make the instance useless.

I described in a previous post how proficient we became at failing over from a master that went AWOL to a slave. Our issues with EBS volumes were compounded by the fact that our database access pattern is very write-intensive, and a shared medium such as EBS was far from ideal. Our devops team was constantly on the alert, and it seemed like we were always rebuilding instances and recovering from EC2 instance failures, although the end-user experience was not affected.

Long story short, we decided to bring the database back in-house, at the data center, on 'real' bare-metal servers. No virtualization, thanks. The whole process went relatively smoothly. One important point I want to make here is that we already had a year's worth of hard numbers at that point regarding the access patterns to our database, iops/sec, MySQL query types, etc, etc. So it made it easy to do proper capacity planning this time, in the presence of production traffic.

We started by buying 2 Dell C2100 servers, monster machines, with dual Intel Xeon X5650 processors (for a total of 24 cores), 144 GB RAM, and 12 x 1 TB hard disks out of which we prepared a 6 TB RAID 10 volume which we further divided in LVM logical volumes for specific types of MySQL files.

We put 2 MySQL instances on each server, and we engaged Percona again to help us fine-tune the configuration, this time including not only MySQL, but also the hardware and the OS. They were super helpful to us, as usual. Here are only some of the things they recommended, which we implemented:
  • set vm.swappiness kernel setting to 0 in /etc/sysctl.conf
  • set InnoDB flush method to O_DIRECT because we can rely on the RAID controller to do the caching (we also mounted XFS with the nobarrier option in conjunction with this change)
  • disable MySQL query cache, which uses a global mutex that can cause performance issues when used on a multi-core server
  • various other optimizations which were dependent on our setup, things like tweaking MySQL configuration options such as key_buffer_size and innodb_io_capacity
One important MySQL configuration option that we had to tweak was innodb_buffer_pool_size. If we set it too high, the server could start swapping. If we set it too low, the disk I/O on the server could become too problematic. Since we had 144 GB of RAM and we were running 2 MySQL instances per server, we decided to give each instance 60 GB of RAM. This proved to strike a good balance.

Once the fine-tuning was done, we directed production traffic away from 4 EC2 m1.xlarge instances to 2 x 2 MySQL instances, with each pair running on a C2100. We then sat back and wallowed for a while in the goodness of the I/O numbers we were observing. Basically, the servers were barely working. This is how life should be. 

We soon migrated all of our MySQL masters back into the data center. We left the slaves running in EC2 (still one m1.xlarge slave per MySQL master instance), but we changed them from being EBS-backed to using the local ephemeral disk in RAID 0 with LVM. We look at EC2 in this case as a secondary data center, used only in emergency situations.

One thing that bit us in our bare-metal setup was....a bare-metal issue around the LSI MegaRAID controllers. I already blogged about the problems we had with the battery relearning cycle, and with decreased performance in the presence of bad drives. But these things were easy to fix (again thanks to our friends at Percona for diagnosing these issues correctly in the first place...)

I am happy to report that we went through our high season for 2011 without a glitch in this setup. Our devops team slept much better at night too! One nice thing about having EC2 as a 'secondary data center' is that if need be, we can scale out horizontally   by launching more EC2 instances. In fact, we doubled the number of MySQL slave instances for the duration of our high season, with the thought that if we need to, we can double the number of shards at the application layer, and thus scale horizontally that way. We didn't have to do any tweaking fortunately, but we were able to -- a strategy which would otherwise be hard to pull off if we didn't have any cloud presence, unless we bought a lot of extra capacity at the data center.

This brings me to one of the points I want to make in this post: it is a very valuable strategy to be able to use the cloud to roll out a new architecture (which you designed from the get-go however to be horizontally scalable) and to gauge its performance in the presence of real production traffic. You will get less than optimal performance per instance (because of virtualization vs. real hardware) , but since you can scale horizontally, you should be able to sustain the desired level of traffic for your application. You will get hard numbers that will help you do capacity planning and you will be able to bring the database infrastructure back to real hardware if you so wish, like we did. Note that Zynga has a similar strategy -- they roll out new games in EC2 and once they get a handle on how much traffic a game has, they bring it back into the data center (although it looks like they still use a private cloud and not bare metal).

Another point I want to make is that the cloud is not ready yet for write-intensive transactional databases, mainly because of the very poor I/O performance that you get on virtual instances in the cloud (compounded by shared network storage such as EBS). Adrian Cockcroft will reply that Netflix is doing just fine and they're exclusively in EC2. I hope they are doing just fine, and I hope his devops team is getting some good sleep at night, but I'm not sure. I need to perhaps qualify my point and say that the cloud is not ready for traditional transactional databases such as MySQL and PostgreSQL, which require manual sharding to be horizontally scalable. If I had to look at redesigning our database architecture today, I'd definitely try out HBase, Riak and maybe Cassandra. The promise there at least is that adding a new node to the cluster in these technologies is much less painful than in the manual sharding and scaling scenario. This still doesn't guarantee that you won't end up paying for a lot of instances to compensate for poor individual I/O per instance. Maybe a cloud vendor like Joyent with their SmartMachines will make a difference in this area (in fact, it is on our TODO list to test out their Percona SmartMachine).

Note however that there's something to be said about using good ol' RDBMS technologies. Ryan Mack says this in a Facebook Engineering post:

"After a few discussions we decided to build on four of our core technologies: MySQL/InnoDB for storage and replication, Multifeed (the technology that powers News Feed) for ranking, Thrift for communications, and memcached for caching. We chose well-understood technologies so we could better predict capacity needs and rely on our existing monitoring and operational tool kits."

The emphasis on the last sentence is mine. It's the operational aspect of a new architecture that will kill you first. With a well understood architecture, at least you have a chance to tame it.

Yet another point I'd like to make is: do not base your disaster recovery strategy in EC2 around EBS volumes, especially if you have a write-intensive database. It's not worth the performance loss, and most of all it's not worth the severe and unpredictable fluctuation in performance. It works much better in our experience to turn the ephemeral disks of an m1.xlarge EC2 instance into a RAID 0 array and put LVM on top of that, and use it for storing the various MySQL file types. We are then able to do LVM snapshots of that volume, and upload the snapshots to S3. To build a new slave, we can restore the snapshot from S3, then catch up the replication with the master. Works fine.

There you have it. An ode in prose to running your database on bare metal. Try it, you may sleep better at night!

Thursday, January 05, 2012

Graphing, alerting and mission control with Graphite and Nagios


We’ve been using Graphite more and more for graphing of OS- and application-related metrics (here are some old-ish notes of mine on installing and configuring Graphite.) We measure and graph variables as diverse as:
  • relative and absolute state of charge of the LSI MegaRAID controller battery (why? because we’ve been burned by battery issues before)
  • database server I/O wait time (critical for EC2 instances which are notorious for their poor I/O performance; at this point we only run MySQL slaves in EC2, and we do not repeat DO NOT use EBS volumes for DB servers, instead we stripe the local disks into a RAID 0 array with LVM)
  • memcached stats such as current connections, get hits and misses, delete hits and misses, evictions
  • percentage and absolute number of HTTP return codes as served by nginx and haproxy
  • count of messages in various queues (our own and Amazon SQS)
  • count of outgoing mail messages

We do have a large Munin install base, so we found Adam Jacob’s munin-graphite.rb script very useful in sending all data captured by Munin to Graphite.

Why Graphite and not Munin or Ganglia? Mainly because it’s so easy to send arbitrarily named metrics to Graphite, but also because we can capture measurements at 1 second granularity (although this is possible with some tweaking with RRD-based tools as well).

On the Graphite server side, we set up different retention policies depending on the type of data we capture. For example, for app server logs (nginx and haproxy) we have the following retention policy specified in /opt/graphite/conf/storage-schemas.conf:

[appserver]
pattern = ^appserver\.
retentions = 60s:30d,10m:180d,60m:2y


This tells Graphite we want to keep data aggregated at 60 second intervals for 30 days, 10 minute data for 6 months and hourly data for 2 years.

The main mechanism we use for sending data to Graphite is tailing various log files at different intervals, parsing the entries in order to extract the metrics we’re interested in, and sending those metrics to Graphite by the tried-and-true method called ‘just open a socket’.

For example, we tail the nginx access log file via a log tracker script written in Python (and run as a daemon with supervisor), and we extract values such as the timestamp, the request URL, the HTTP return code, bytes sent and the request time in milliseconds. The default interval for collecting these values is 1 minute. For HTTP return codes, we group the codes such as 2xx, 3xx, 4xx, 5xx together, so we can report on each type of return code. We aggregate the values per collection interval, then send the counts to Graphite, named something like appserver.app01.500.reqs, which represents the HTTP 500 error count on server app01.

A more elegant way would be to use a tool such as logster to capture various log entries, but we haven’t had the time to write logster plugins for the 2 main services we’re interested in, nginx and haproxy. Our solution is deemed temporary, but as we all know, there’s nothing more permanent than a temporary solution.

For some more unusual metrics that we measure ourselves, such as LSI MegaRaid battery charge state, we run a shell script in an infinite loop and produce a value every second, then we send it to Graphite. To obtain the value we run something that resembles line noise:

$ MegaCli64 -AdpBbuCmd -GetBbuStatus -a0 | grep -i "Relative State of Charge" | awk '{print $5}'

(thanks to my colleague Marco Garcia for coming up with this)

Once the data is captured in Graphite, we can do several things with it:
  • visualize it using the Graphite dashboards
  • alert on it using custom Nagios plugins
  • capture it in our own more compact dashboards, so we can have multiple graphs displayed on one ‘mission control’ page

Nagios plugins

My colleague Marco Garcia wrote a Nagios plugin in Python to alert on HTTP 500 errors. To obtain the data from Graphite, he queries a special ‘rawData’ URL of this form:

http://graphite.yourdomain.com/render/?from=-5minutes&until=-1minutes&target=asPercent(sum(appserver.app01.500.*.reqs),sum(appserver.app01.*.*.reqs))&rawData

which returns something like

asPercent(sumSeries(appserver.app01.500.*.reqs),sumSeries(appserver.app01.*.*.reqs)),1325778360,1325778600,60|value1,value2,value3,value4

where the 4 values represent the 4 data points, one per minute, from 5 minutes ago to 1 minute ago. Each data point is the percentage of HTTP 500 errors calculated against the total number of HTTP requests.

The script then compares the values returned with a warning threshold (0.5) and a critical threshold (1.0). If all values are greater than the respective threshold, we issue a warning or a critical Nagios alert.

Mission control dashboard

My colleague Josh Frederick came up with the idea of presenting all relevant graphs based on data captured in Graphite in a single page which he dubbed ‘mission control’. This enables us to make correlations at a glance between things such as increased I/O wait on DB servers and spikes in HTTP 500 errors.

To generate a given graph, Josh uses some Javascript magic to come up with an URL such as:

http://graphite.yourdomain.com/render/?width=500&height=200&hideLegend=1&from=-60minutes&until=-0minutes&bgcolor=FFFFFF&fgcolor=000000&areaMode=stacked&title=500s%20as%20%&target=asPercent(sum(appserver.app*.500.*.reqs),%20sum(appserver.app*.*.*.reqs))&_ts=1325783046438

which queries Graphite for the percentage of HTTP 500 errors from all HTTP requests across all app servers for the last 60 minutes. The resulting graph looks like this:



Our mission control page currently has 29 such graphs.

We also have (courtesy of Josh again) a different set of charts based on the Google Visualization Python API. We get the data from Graphite in CSV format (by adding format=csv to the Graphite query URLs), then we display the data using the Google Visualization API.

If you don’t want to roll your own JS-based dashboard talking to Graphite, there’s a tool called statsdash that may be of help.

Modifying EC2 security groups via AWS Lambda functions

One task that comes up again and again is adding, removing or updating source CIDR blocks in various security groups in an EC2 infrastructur...