James Burkhart explains how Uber supports millions of analytical queries daily across real-time data with Apollo. James covers the architectural decisions and lessons learned building an exactly-once ingest pipeline storing raw events across in-memory row storage and on-disk columnar storage and a custom metalanguage and query layer leveraging partial OLAP result set caching and query canonicalization. Putting all the pieces together provides thousands of Uber employees with subsecond p95 latency analytical queries spanning hundreds of millions of recent events.
8. Motivation, Functionality Requirements
- Index based on data timestamp, not arrival timestamp
- Out of order and late (up to days later) arrival
- Mutability
- Sub-linear performance impact of scaling QPS
10. Environment Management
(MemSQL Cluster Sizes)
Datacenter 1 Datacenter 2
Production Prime
33x 256GB
Production Prime 2
43x 256GB
Production Minor
5x 256GB
Production Minor 2
7x 256GB
Staging/Preprod
25x 256GB
mirrored
14. Ingestion
● Upserts - No double counting!
● Async RF=2 MemSQL replication
○ Can lose recent writes during hardware failure
● Solution -> every 6 hours, upsert last 72h worth of data in
batch from Hive
20. Why SQL is hard for time series OLAP
Field Value
Dimension.SQLExpression request_at
Dimension.TimeBucketizer day
Dimension.TimeUnit millisecond
Timezone America/Los_Angeles
21. Why SQL is hard for time series OLAP
● Date/time functions:
○ ROUND(UNIX_TIMESTAMP(CONVERT_TZ(DATE_FORMAT(CONVERT_TZ(FROM_UNIXTIME(((trips.request_at) - (trips.request_at) %
900000) / 1000), 'GMT', 'America/Los_Angeles'), '%Y-%m-%d'), 'America/Los_Angeles', 'UTC')) / 0.001, 0)
○ Cheap timestamp snapping to 15m
○ Conversion from milliseconds to seconds
○ Conversion from Unix timestamp to SQL time
○ Adding timezone to Unix time
○ Date/time formatting/truncation
○ Timezone conversion
○ Conversion from SQL time to Unix timestamp
○ Conversion from seconds to milliseconds
Field Value
Dimension.SQLExpression request_at
Dimension.TimeBucketizer day
Dimension.TimeUnit millisecond
Timezone America/Los_Angeles
22. Why SQL is hard for time series OLAP
● City/Region/Country based timezone
○ ROUND(UNIX_TIMESTAMP(CONVERT_TZ(DATE_FORMAT(CONVERT_TZ(FROM_UNIXTIME(((trips.request_at) - (trips.request_at) %
900000) / 1000), 'GMT', __tz__.sub_region_timezone), '%Y-%m-%d'), __tz__.sub_region_timezone, 'UTC')) / 0.001, 0) FROM trips
JOIN api_cities as __tz__ ON trips.city_id = __tz__.id
○ Join with api_cities (which has timezone info of each level) on city_id
○ Use the corresponding timezone column from api_cities
Field Value
Dimension.SQLExpression request_at
Dimension.TimeBucketizer day
Dimension.TimeUnit millisecond
Timezone sub_region_timezone(city_id)
23. Why SQL is hard for time series OLAP
● #completed_trips / #requested_trips
○ SUM(CASE WHEN trips.status=’completed’ THEN 1 ELSE 0 END) / SUM(CASE WHEN trips.status!=’ignored’ THEN 1 ELSE 0 END)
○ SELECT …, _1.completed / _2.requested FROM (SELECT …, COUNT(*) as completed FROM trips WHERE status=’completed’ GROUP BY
...) AS _1 JOIN (SELECT …, COUNT(*) as requested FROM trips WHERE status!=’ignored’ GROUP BY ...) AS _2 ON ...
○ Filters make measures complex
Field Value
Measure[0].SQLExpression count(*)
Measure[0].Filters status=’completed’
Measure[0].Alias completed
Measure[1].SQLExpression count(*)
Measure[1].Filters status!=’ignored’
Measure[1].Alias requested
Measure[2].SQLExpression completed / requested
24. Why SQL is hard for time series OLAP
● #Trips by geofence for geofence A, B and C
○ SELECT count(*), geofences.uuid FROM trips JOIN geofences ON geography_intersects(trips.request_point, geofences.shape) WHERE
geofences.uuid IN (A, B, C) GROUP By geofences.uuid
● Total #Trips for geofence A, B and C
○ SELECT count(*) FROM trips JOIN geofences ON geography_intersects(trips.request_point, geofences.shape) WHERE geofences.uuid IN
(A, B, C)
● Overlapping is OK, overcounting is not!
○ SELECT count(*) FROM trips WHERE EXISTS (SELECT * FROM geofences WHERE geography_intersects(trips.request_point,
geofences.shape) AND geofences.uuid IN (A, B, C)
25. Bad SQL queries
● SELECT count(*), request_at FROM trips GROUP BY request_at;
○ Time needs to be bucketized! Grouping by milliseconds makes no sense!
● SELECT count(*), fare_total FROM trips GROUP BY fare_total;
○ Some numeric values such as fare needs to be bucketized (reported as histograms)!
● SELECT sum(fare_total) FROM trips, other_table WHERE trips.fare_total>1.0 AND other_table.foo=’BAR’;
○ Join condition is missing, cartesian product is bad!
26. AQL Query Optimization
Date/time function performance issue
● CONCAT(DATE_FORMAT(FROM_UNIXTIME((__d0__) / 1000), '%Y-%m-%d '), LPAD(3 *
FLOOR(HOUR(FROM_UNIXTIME((__d0__) / 1000)) / 3), 2, '0'), ':00')
● Run for every row (trip)!
Two-stage aggregation
date/time
function
bucketizaton
request_at
count(*)
date/time
function
bucketizaton
request_at
count(*) as c
t - t % 15m
sum(c) Stage 2
Stage 1
27. Time Series Bucket Splitting
Now: 2016-03-22 13:17
2016-03-21 (partial week)
2016-03-21 (day) 2016-03-22
00:00
(hour)
2016-03-22
01:00
(hour)
...
(hour)
2016-03-22
12:00
(hour)
2016-03-22
13:00
(15m)
2016-03-22
13:15
(minute)
2016-03-22
13:16
(minute)
2016-03-22 13:15 (15m)
Split Rollup
From: this week To: now
29. AQL Query Optimization
Aggregate rollups
avg(x) = sum(x) / count(*)
Original function Stage 1 Stage 2 (rollup)
count count sum
sum sum sum
min min min
max max max
count distinct distinct count distinct
HyperLogLog
31. Contracts
SELECT AVG(fare), ts_15m FROM trips WHERE time >= (now() - 1h)
(where city=x)
group by 15m(, city);
(where city=x) --p95--> 50ms 60ms 70ms
For x in cities:
(where city=x) -sum-> ~9s ~10s ~12s
group by city --p95--> 200ms ~1s ~7s
1h 24h (21d, group by 24h)
32. Contracts
SELECT AVG(fare), ts_15m FROM trips WHERE time >= (now() - 1h)
(where city=x)
group by 15m(, city);
(where city=x) --p95--> 50ms 60ms 70ms
For x in cities:
(where city=x) -sum-> ~9s ~10s ~12s
group by city --p95--> 200ms ~1s ~7s
1h 24h (21d, group by 24h)
34. Contracts
SELECT COUNT(1) FROM trips WHERE
City = ‘San Francisco’
State = ’completed’
Product = ’Uber-X’
(City,State,Product),(City,State),(City,Product),(City),
(State),(State,Product),
(Product),
(∅)
Geographical Breakdowns:
World > North America > United States > US West > California > BayArea > SF
35. Contracts
SELECT COUNT(1) FROM trips WHERE GROUP BY
City = ‘San Francisco’
State = ’completed’
Product = ’Uber-X’
(City,State,Product),(City,State),(City,Product),(City),
(State),(State,Product),
(Product),
(∅)
Geographical Breakdowns:
World > North America > United States > US West > California > BayArea > SF
42. Questions?
(PS: We’re hiring)
Uber Engineering Blog
eng.uber.com
Uber Open Source
uber.github.io
Uber Eng Twitter
twitter.com/ubereng
These slides
https://tinyurl.com/apollostrata msql.co/uberscale
Check out ‘Hoodie: Incremental processing on Hadoop at Uber’ Thursday 1:50-2:30 for the
next Uber Strata presentation.