The Top K Problem (Heavy Hitters) is a common type of dashboard. Based on the popularity or lack thereof of certain products, we can make decisions to promote or discontinue them. Such decisions may not be straightforward. For example, if a product is unpopular, we may decide to either discontinue it to save the costs of selling it, or we may decide to spend more resources to promote it to increase its sales.
The Top K Problem is a common topic we can discuss in an interview when discussing analytics, or it may be its own standalone interview question. It can take on endless forms. Some examples of the Top K Problem include
- Top-selling or worst-selling products on an ecommerce app by volume (this question) or revenue.
- The most-viewed or least-viewed products on an ecommerce app.
- Most downloaded apps on an apps store.
- Most watched videos on a video app like YouTube.
- Most popular (listened to) or least popular songs on a music app like Spotify.
- Most traded stocks on an exchange like Robinhood or E*TRADE.
- Most forwarded posts on a social media app, such as the most retweeted Twitter tweets or most shared Instagram post.
Requirements
Few things to consider for functional and non-functional requirement
- In case of ties, High accuracy may not be important, so we can choose any item in a tie:
- Our system should be able to aggregate by certain specified intervals such as hour, day, week, or year:
- The use cases will influence the desired accuracy (and other requirements like scalability). What are the use cases of this information? What is the desired accuracy and desired consistency/latency? It will be resource-intensive to compute accurate volumes and ranking in real time. Perhaps we can have a Lambda architecture, so we have an eventually consistent solution that offers approximate sales volumes and rankings within the last few hours and accurate numbers for time periods older than a few hours.
- We can also consider trading off accuracy for higher scalability, lower cost, lower complexity, and better maintainability. We expect to compute a particular Top K list within a particular period at least hours after that period has passed, so consistency is not a concern.
- We can expect that generating a list will require many minutes:
- We can accept a solution that provides the approximate volumes and ranking of the top 10 products within the last few hours, and volumes and ranking of any arbitrary number of products for time periods older than a few hours, potentially up to years. It’s also fine if our solution can display more than 10 products:
- Do we need to show the sale counts on the Top K list or just the product sales rankings?
We will show both the rankings and counts. - Do we need to consider events that occur after a sale? A customer may request a refund, an exchange for the same or different product(s), or a product may be recalled.
Let’s assume we can consider only the initial sales events and disregard subsequent events like disputes or product recalls: - Let’s discuss scalability requirements. What is the sales transaction rate? What is the request rate for our Heavy Hitters dashboard? How many products do we have?
Assume 10 billion sales events per day (i.e., heavy sales transaction traffic). At 1 KB/event, the write rate is 10 TB/day. The Heavy Hitters dashboard will only be viewed by employees, so it will have low request rate. Assume we have ~1M products.
We need to preprocess our data prior to computing these Top K lists. We should periodically perform aggregation and count the sales of our products, bucketing by hour, day, week, month, and year. Then we can perform these steps when we need a Top K list:
- If needed, sum the counts of the appropriate buckets, depending on the desired period. For example, if we need the Top K list of a period of one month, we simply use that month’s bucket. If we need a particular three-month period, we sum the counts of the one-month buckets of that period. This way, we can save storage by deleting events after we sum the counts.
- Sort these sums to obtain the Top K list.
We need to save the buckets because the sales can be very uneven. In an extreme situation, a product “A” may have 1M sales within a particular hour during a particular year, and 0 sales at all other times during that year, while sales of all other products may sum to far less than 1M total sales in that year. Product A will be in the Top K list of any period that includes that hour.
High-level architecture
We first consider Lambda architecture. Lambda architecture is an approach to handling massive quantities of data by using both batch and streaming methods. As shown in below figure, our lambda architecture consists of two parallel data processing pipelines and a serving layer that combines the results of these two pipelines:
- A streaming layer/pipeline that ingests events in real time from all data centers where sales transactions occur and uses an approximation algorithm to compute the sales volumes and rankings of the most popular products.
- A batch layer, or batch pipelines that run periodically (hourly, daily, weekly, and yearly) to compute accurate sales volumes and rankings. For our users to see the accurate numbers as they become available, our batch pipeline ETL job can contain a task to overwrite the results of the streaming pipeline with the batch pipeline’s whenever the latter are ready.
Following an EDA (Event Driven Architecture) approach, the sales backend service sends events to a Kafka topic, which can be used for all downstream analytics such as our Top K dashboard. Event Driven Architecture (EDA) uses events to trigger and communicate between decoupled services.
Aggregation service
An initial optimization we can make to our Lambda architecture is to do some aggregation on our sales events and pass these aggregated sales events to both our streaming and batch pipelines. Aggregation can reduce the cluster sizes of both our streaming and batch pipelines. Our streaming and batch pipelines both write to an RDBMS (SQL), which our dashboard can query with low latency. We can also use Redis if all we need is simple key-value lookups, but we will likely desire filter and aggregation operations for our dashboard and other future services.
The sales backend logs events (including sales events) to a shared logging service, which is the data source for our dashboard. Our aggregation service consumes sales events from our shared logging service, aggregates them, and flushes these aggregated events to our streaming pipeline and to HDFS. Our batch pipeline computes the counts from our HDFS data and writes it to the SQL batch_table. Our streaming pipeline computes the counts faster and less accurately than our batch pipeline and writes it to the SQL speed_table. Our dashboard uses a combination of data from batch_table and speed_table to generate the Top K lists.
Aggregating by product ID
For example, a raw sales event may contain fields like (timestamp, product ID) while an aggregated event may be of the form (product_id, start_time, end_time, count, aggregation_host_id). We can aggregate the events since their exact timestamps are unimportant. If certain time intervals are important (e.g., hourly), we can ensure that (start_time, end_time) pairs are always within the same hour.
Our aggregation service can partition by product ID, so each host is responsible for aggregating a certain set of IDs. For simplicity, we can manually maintain a map of (host ID, product ID) in configuration file. The number of hosts will likely not exceed a few hundred or a few thousand, so the configuration file will be tiny. Each host can fetch the entire file. We do not need a solution with the low latency read performance of a database.
Aggregation process on a host
An aggregation host contains a hash table with key of product ID and value of count. It also does checkpointing on the Kafka topic that it consumes, writing checkpoints to Redis. The checkpoints consist of the IDs of the aggregated events. Each host repeatedly does the following:
- Consume an event from the topic.
- Update its hash table.
An aggregation host may flush its hash table with a set periodicity or when its memory is running out, whichever is sooner. A possible implementation of the flush process is as follows:
- Produce the aggregated events to a Kafka topic that we can name “Flush.” If the aggregated data is small (e.g., a few MB), we can write it as a single event, consisting of a list of product ID aggregation tuples with the fields (“product ID,” “earliest timestamp,” “latest timestamp,” “number of sales”).
- Using change data capture (CDC), each destination has a consumer that consumes the event and writes to it:
- Write the aggregated events to HDFS.
- Write a tuple checkpoint to Redis with the status “complete” (e.g., {“hdfs”: “1620540831, complete”}).
- Repeat steps 2a–c for the streaming pipeline.
If we did not have this “Flush” Kafka topic, and a consumer host fails while writing an aggregated event to a particular destination, the aggregation service will need to reaggregate those events.
If a host fails during step 1, another host can consume the flush event and perform the writes. If the host fails during step 2a, the write to HDFS may have succeeded or failed, and another host can read from HDFS to check if the write succeeded or if it needs to be retried. Reading from HDFS is an expensive operation. As a host failure is a rare event, this expensive operation will also be rare. If we are concerned with this expensive failure recovery mechanism, we can implement the failure recovery mechanism as a periodic operation to read all “processing” checkpoints between a minute and a few minutes old.
We should consider fault-tolerance. Any write operation may fail. Any host in the aggregation service, Redis service, HDFS cluster, or streaming pipeline can fail at any time. There may be network problems that interrupt write requests to any host on a service. A write event response code may be 200 but a silent error actually occurred. Such events will cause the three services to be in an inconsistent state. Therefore, we write a separate checkpoint for HDFS and our streaming pipeline. The write event should have an ID, so the destination services may perform deduplication if needed.