On Time Series Analysis and Big Data. Interview with Andrei Gorine
“Time series data” are sequential series of data about things that change over time, usually indexed by a timestamp. The world around us is full of examples –Andrei Gorine.
I have interviewed Andrei Gorine, Chief Technical Officer and McObject co-founder.
Main topics of the interview are: time series analysis, “in-chip” analytics, efficient Big Data processing, and the STAC M3 Kanaga Benchmark.
Q1. Who is using time series analysis?
Andrei Gorine: “Time series data” are sequential series of data about things that change over time, usually indexed by a timestamp. The world around us is full of examples — here are just a few:
• Self-driving cars continuously read data points from the surrounding environment — distances, speed limits, etc., These readings are often collected in the form of time-series data, analyzed and correlated with other measurements or onboard data (such as the current speed) to make the car turn to avoid obstacles, slow-down or speed up, etc.
• Retail industry point-of-sale systems collect data on every transaction and communicate that data to a back-end where it gets analyzed in real-time, allowing or denying credit, dispatching goods, and extending subsequent relevant retail offers. Every time a credit card is used, the information is put into a time series data store where it is correlated with other related data through sophisticated market algorithms.
• Financial market trading algorithms continuously collect real-time data on changing markets, run algorithms to assess strategies and maximize the investor’s return (or minimize loss, for that matter).
• Web services and other web applications instantly register hundreds of millions of events every second, and form responses through analyzing time-series data sets.
• Industrial automation devices collect data from millions of sensors placed throughout all sorts of industrial settings — plants, equipment, machinery, environment. Controllers run analysis to monitor the “health” of production processes, making instant control decisions, sometimes preventing disasters, but more often simply ensuring uneventful production.
Q2. Why is a columnar data layout important for time series analysis?
Andrei Gorine: Time-series databases have some unique properties dictated by the nature of the data they store. One of them is the simple fact that time-series data can accumulate quickly, e.g. trading applications can add millions of trade-and-quote (“TAQ”) elements per second, and sensors based on high-resolution timers generate piles and piles data. In addition, time-series data elements are normally received by, and written into, the database in timestamp order. Elements with sequential timestamps are arranged linearly, next to each other on the storage media. Furthermore, a typical query for time-series data is an analytical query or aggregation based on the data’s timestamp (e.g. calculate the simple moving average, or volume weighted average price of a stock over some period). In other words, data requests often must gain access to a massive number of elements (in real-life often millions of the elements) of the same time series.
The performance of a database query is directly related to the number of I/O calls required to fulfill the request: less I/O contributes to greater performance. Columnar data layout allows for significantly smaller working set data sizes – the hypothetical per-column overhead is neglible compared to per-row overhead. For example, given a conservative 20 bytes per-row overhead, storing 4-byte measurements in the horizontal layout (1 time series entry per row) requires 6 times more space than in the columnar layout (e.g. each row consumes 24 bytes, whereas one additional element in a columnar layout requires just 4 bytes). Since there is less storage space required to store time series data in the columnar layout, less I/O calls are required to fetch any given amount of real (non-overhead) data. Another space-saving feature of the columnar layout is content compression —columnar layout allows for far more efficient and algorithmically simpler compression algorithms (such as run-length encoding over a column). Lastly, row-based layout contains many columns. In other words, when the database run-time reads a row of data (or, more commonly, a page of rows), it is reading many columns. When analytics require only one column (e.g. to calculate an aggregate of a time-series over some window of time), it is far more efficient to read pages of just that column.
The sequential pattern in which time-series data is stored and retrieved in columnar databases (often referred to as “spatial locality”) leads to a higher probability of preserving the content of various cache subsystems, including all levels of CPU cache, while running a query. Placing relevant data closer to processing units is vitally important for performance: L1 cache access is 3 times faster than L2 cache access, 10 times faster than L3 unshared line access and 100 times faster than access to RAM (i7 Xeon). In the same vein, the ability to utilize various vector operations, and the ever growing set of SIMD instruction sets (Single Instruction Multiple Data) in particular, contribute to speedy aggregate calculations. Examples include SIMD vector instructions that operate on multiple values contained in one large register at the same time, SSE (Streaming SIMD Extensions) instruction sets on Intel and AltiVec instructions on PowerPC, pipelining (i.e. computations inside the CPU that are done in stages), and much more.
Q3. What is “on-chip” analytics and how is it different than any other data analytics?
Andrei Gorine: This is also referred to as “in-chip” analytics. The concept of pipelining has been successfully employed in computing for decades. Pipelining is referred to a series of data processing elements where the output of one element is the input of the next one. Instruction pipelines have been used in CPU designs since the introduction of RISC CPUs, and modern GPUs (graphical processors) pipeline various stages of common rendering operations. Elements of software pipeline optimizations are also found in operating system kernels.
Time-series data layouts are perfect candidates to utilize a pipelining approach. Operations (functions) over time-series data (in our product time series data are called “sequences”) are implemented through “iterators”. Iterators carry chunks of data relevant to the function’s execution (we call these chunks of data “tiles”). Sequence functions receive one or more input iterators, perform required calculations and write the result to an output iterator. The output iterator, in turn, is passed into the next function in the pipeline as an input iterator, building a pipeline that moves data from the database storage through the set of operations up to the result set in memory. The “nodes” in this pipeline are operations, while the edges (“channels”) are iterators. The interim operation results are not materialized in memory. Instead the “tiles” of elements are passed through the pipeline, where each tile is referenced by an iterator. The tile is the unit of data exchange between the operators in the pipeline. The tile size is small enough keep the tile in the top-level L1 CPU cache and large enough to allow for efficient use of superscalar and vector capabilities of modern CPUs. For example, 128 time-series elements fit into a 32K cache. Hence the term “on-chip” or “in-chip” analytics. As mentioned, top-level cache access is 3 times faster than level two (L2) cache access.
To illustrate the approach, consider the operation x*y + z where x ,y and z are large sequences, or vectors if you will (perhaps megabytes or even gigabytes). If the complete interim result of the first operation (x*y) is created, then at the moment the last element of it is received, the first element of the interim sequence is already pushed out of the cache. The second operation (+ z) would have to load it from memory. Tile-based pipelining avoids this scenario.
Q4. What are the main technical challenges you face when executing distributed query processing and ensuring at the same time high scalability and low latency when working with Big Data?
Andrei Gorine: Efficient Big Data processing almost always requires data partitioning. Distributing data over multiple physical nodes, or even partitions on the same node, and executing software algorithms in parallel allows for better hardware resource utilization through maximizing CPU load and exploiting storage media I/O concurrency. Software lookup and analytics algorithms take advantage of each node’s reduced data set through minimizing memory allocations required to run the queries, etc. However, distributed data processing comes loaded with many challenges. From the standpoint of the database management system, the challenges are two-fold: distributed query optimization and data distribution
First is optimizing distributed query execution plans so that each instance of the query running on a local node is tuned to minimize the I/O, CPU, buffer space and communications cost. Complex queries lead to complex execution plans. Complex plans require efficient distribution of queries through collecting, sharing and analyzing statistics in the distributed setup. Analyzing statistics is not a trivial task even locally, but in the distributed system environment the complexity of the task is an order of magnitude higher.
Another issue that requires a lot of attention in the distributed setting is runtime partition pruning. Partition pruning is an essential performance feature. In a nutshell, in order to avoid compiling queries every time, the queries are prepared (for example, “select ..where x=10” is replaced with “select ..where x=?”). The problem is that in the un-prepared form, the SQL compiler is capable of figuring out that the query is best executed on some known node. Yet in the second, prepared form, that “best” node is not known to the compiler. Thus, the choices are either sending the query to every node, or locating the node with the given key value during the execution stage.
Even when the SQL execution plan is optimized for distributed processing, the efficiency of distributed algorithms heavily depends on the data distribution. Thus, the second challenge is often to figure out the data distribution algorithm so that a given set of queries are optimized.
Data distribution is especially important for JOIN queries — this is perhaps one of the greatest challenges for distributed SQL developers. In order to build a truly scalable distributed join, the best policy is to have records from all involved tables with the same key values located on the same node. In this scenario all joins are in fact local. But, in practice, this distribution is rare. A popular JOIN technique is to use “fact” and “dimension” tables on all nodes while sharding large tables. However, building dimension tables requires special attention from application developers. The ultimate solution to the distributed JOIN problem is to implement the “shuffle join” algorithm. Efficient shuffle join is, however, very difficult to put together.
Q5. What is the STAC M3 Kanaga Benchmark and what is it useful for?
Andrei Gorine: STAC M3 Kanaga simulates financial applications’ patterns over large sets of data. The data is represented via simplified historical randomized datasets reflecting ten years of trade and quote (TAQ) data. The entire dataset is about 30 terabytes in size. The Kanaga test suite consists of a number of benchmarks aimed to compare different aspects of its “System Under Test” (SUT), but mostly to highlight performance benefits of the hardware and DBMS software utilized. The Kanaga benchmark specification was written by practitioners from global banks and trading firms to mimic real-life patterns of tick analysis. Our implementations of the STAC-M3 benchmark aim to fully utilize the underlying physical storage I/O channels, and maximize CPU load by dividing the benchmark’s large dataset into a number of smaller parts called “shards”. Based on the available hardware resources, i.e. the number of CPU cores and physical servers, I/O channels, and sometimes network bandwidth, the number of shards can vary from dozens to hundreds. Each shard’s data is then processed by the database system in parallel, usually using dedicated CPU cores and media channels, and the results of that processing (calculated averages, etc.) are combined into a single result set by our distributed database management system.
The Kanaga test suite includes a number of benchmarks symptomatic of financial markets application patterns:
An I/O bound HIBID benchmark that calculates the high bid offer value over period of time — one year for Kanaga. The database management system optimizes processing through parallelizing time-series processing and extensive use of single instruction, multiple data (SIMD) instructions, yet the total IOPS (Inputs/Outputs per Second) that the physical storage is capable of is an important factor in receiving better results.
The “market snapshot” benchmark stresses the SUT — the database and the underlying hardware storage media, requiring them to perform well under high-load parallel workload that simulates real-world financial applications’ multi-user data access patterns. In this test, the (A) ability to execute columnar-storage operations in parallel, (B) efficient indexing and (C) low storage I/O latency play important roles in getting better results.
The volume-weighted average bid (VWAB) benchmarks over a one day period. On the software side, the VWAB benchmarks benefit from the use of the columnar storage and analytics function pipelining discussed above to maximize efficient CPU cache utilization and CPU bandwidth and reduce main memory requirements . Hardware-wise, I/O bandwidth and latency play a notable role.
Andrei Gorine, Chief Technical Officer, McObject.
McObject co-founder Andrei leads the company’s product engineering. As CTO, he has driven growth of the eXtremeDB real-time embedded database system, from the product’s conception to its current wide usage in virtually all embedded systems market segments. Mr. Gorine’s strong background includes senior positions with leading embedded systems and database software companies; his experience in providing embedded storage solutions in such fields as industrial control, industrial preventative maintenance, satellite and cable television, and telecommunications equipment is highly recognized in the industry. Mr. Gorine has published articles and spoken at many conferences on topics including real-time database systems, high availability, and memory management. Over the course of his career he has participated in both academic and industry research projects in the area of real-time database systems. Mr. Gorine holds a Master’s degree in Computer Science from the Moscow Institute of Electronic Machinery and is a member of IEEE and ACM.
Follow us on Twitter: @odbmsorg