Partitioning data in kdb+
13 Aug 2020 | kdb+, q language
by Rian O’Cuinneagain
In the world of big data, the number of opportunities that organizations can envision can be enormous, but so can the challenges in realizing them. In doing so, some can readily use well-rounded off-the-shelf architectures while others, however, may have to be more creative. Users with high-value data can justify the expense of high RAM systems, such as in finance, for example, where being the fastest counts. In other industries, the volume of data can be staggering but each individual piece in isolation is of little value, hence any database needs to make efficient use of compute resources in extracting those underlying trends and insights across broad swathes of data. One such example would be in the area of predictive maintenance. At the core of Kx lies kdb+ which is a programming language created to implement databases. This provides extreme flexibility when designing systems with unique query patterns on limited hardware. One key concept enabling this is the partitioning of data.
Kdb+ supports partitioned databases. This means that when data is stored to disk it is partitioned into different folders.
/HDB
sym
…
/2020.06.25
/trade
/quote
/2020.06.26
/trade
/quote
…
Each day when data is stored a new date folder will be created. Once the data is loaded into a process a virtual date
column will be created. This allows the user to include a date filter in their where clause:
select from quote where date=2020.06.25,sym=`FDP
Copy
This physical partitioning and seamless filtering allows kdb+ to perform very performant queries as a full database scan is not required to retrieve data.
Furthermore, native map-reduce allow queries which span multiple partitions to make use of multi-threading for further speedup:
select vwap: size wavg price by sym from trade
where date within 2020.06.01 2020.06.26
Copy
Aside from date
the other possible choices for the parted domain are: year
, month
, and int
.
In the rest of this post, we will explore some uses of int
partitioning.
Note: This post serves as a discussion on a topic – it is not intended as deployable code in mission critical systems.
Hourly partitioning
Hourly Partitioning can be used as a way to reduce the RAM footprint of a kdb+ system. This solution is very simple to implement but not as powerful as a fully thought out intraday-writedown solution.
Firstly a helper function is needed to convert timestamps to an int equivalent:
hour:{`int$sum 24 1*`date`hh$\:x}
Copy
This hour
function takes a timestamp and calculates the number of hours since the kdb+ epoch:
q)hour 2000.01.01D01
1i
q)hour 2020.06.27D16
179608i
Copy
Taking kdb-tick as a template very few changes are needed to explore hourly partitioning.
- Edits in
tick.q
are mainly focused around usinghour .z.P
rather than.z.D
along with some renaming of variables for clarity replacing day with hour. - Changes were also needed in
tick.q
andr.q
related to the naming of the tickerplant log file, while all dates are 10 characters long the int hour value will eventually grow in digits.At 4 pm on Saturday, January 29th, 2114 to be exact!q)hour 2114.01.29D16 1000000i
Copy - Moving the
time
column from a timespan (n
) to a timestamp (p
) was chosen. This datatype does not use more space or lose any precision but has the benefit of including the date which is helpful to allow viewing of the date now that thedate
column is removed. Another option is a helper function to extract the date back from the encodedint
column:
q)intToDate:{`date$x div 24}
q)hour 2020.06.27D16
179608i
q)intToDate 179608i
2020.06.27
Copy
The full extent of the changes is best explored by reviewing the git commit.
Once the HDB process reloads after an hour threshold has been crossed you can explore the data. On disk the int
partition folders can be seen:
/HDB
sym
…
/179608
/trade
/quote
/179609
/trade
/quote
…
When querying the HDB the virtual int
column is visible:
q)quote
int sym time bid ask bsize asize
-------------------------------------------------------------------------
179608 baf 2020.06.27D16:20:46.040466000 0.3867353 0.3869818 5 7
179608 baf 2020.06.27D16:20:46.040466000 0.726781 0.6324114 2 8
The same hour
function can be used to query the data efficiently:
select from trade where int=hour 2020.06.27D16
select from trade where int within hour 2020.06.26D0 2020.06.27D16
Copy
If you wish to store data prior to the kdb+ epoch 2000.01.01D0
you will need to make some adjustments. This is due to a requirement for the int partitions to have positive values.
To use a different epoch only small changes are needed. Here 1970.01.01
:
hour:{`int$sum 24 1*@[;0;-;1970.01.01] `date`hh$\:x}
intToDate:{1970.01.01+x div 24}
Copy
q)hour 2020.06.27D16
442576i
q)intToDate 442576i
2020.06.27
Copy
Fixed size partitioning
One possible concern with hourly partitioning would be the fact that data does not always stream at a steady rate. This would lead to partitions of varying sizes and would not protect a system well if there was a sudden surge in the volume of incoming data.
To create a system with a more strictly controlled upper limit on memory usage we will build an example which will flush data to disk-based on a triggered condition on the size of the tickerplant log. This will be used as a proxy for how much RAM the RDB is likely to be using. This trigger could easily be reconfigured to fire based on total system memory usage or any other chosen value. For this example implementation, the size of the tickerplant log file is used to control when to flush data.
A new command-line value is passed which is accessed with .z.x and multiplied by the number of bytes in a megabyte:
\d .u
n:("J"$.z.x 2)*`long$1024 xexp 2;
Copy
This new n
variable is compared to the size of the log file as given by hcount after each time data is appended. If the threshold is breached then the endofpart
call is triggered:
if[n<=hcount L;endofpart[]]
Copy
Note: While this method is very exact it would not be recommended in a tickerplant receiving many messages as the overhead of polling the filesystem for the file size can be a slow operation.
The int value now starts from 0
and increments each time a partition is added:
q)select from quote
int sym time bid ask bsize asize
----------------------------------------------------------------------
0 baf 2020.06.28D17:15:54.751561000 0.3867353 0.3869818 5 7
0 baf 2020.06.28D17:15:54.751561000 0.726781 0.6324114 2 8
On startup the tickerplant must list all files using key and determine the maximum partition value to use:
p:{
f:x where x like (get `..src),"_*";
$[count f;max "J"$.[;((::);1)]"_" vs'string f;0]
} key `:.
Copy
Now that our partitions are no longer tied to a strict time domain the previous solution of a smaller helper function is not sufficient to enable efficient querying. A lookup table will be needed to enable smart lookups across the partitions.
q)lookup
part tab minTS maxTS
----------------------------------------------------------------------
0 quote 2020.06.28D17:14:33.520763000 2020.06.28D17:15:54.751561000
0 trade 2020.06.28D17:14:33.515537000 2020.06.28D17:15:54.748619000
1 quote 2020.06.28D17:15:54.762522000 2020.06.28D17:16:57.867296000
1 trade 2020.06.28D17:15:54.757298000 2020.06.28D17:16:57.864316000
This table sits in the root of the HDB. Each time a partition is written the lookup table has new information appended to it by .u.addLookup
:
.u.addLookup:{
`:lookup/ upsert .Q.en[`:.] raze {select part:enlist x,tab:enlist y,
minTS:min time,maxTS:max time from y}[x] each tables[]
};
Copy
saveAndReload
replaces .Q.hdpf as now when the HDB is reloading cacheLookup
needs to be called:
k)saveAndReload:{[h;d;p;f]
(@[`.;;0#].Q.dpft[d;p;f]@)'t@>(#.:)'t:.q.tables`.;
if[h:@[hopen;h;0];
h"system\"l .\";cacheLookup[]";>h]
};
Copy
cacheLookup
reads from the lookup
from disk and creates an optimized dictionary intLookup
which will be used when querying data:
cacheLookup:{
if[`lookup in tables[];
intLookup::.Q.pt!{
`lim xasc ungroup select (count[i]*2)#part,lim:{x,y
}[minTS;maxTS] from lookup where tab=x
} each .Q.pt];
};
Copy
A new helper function findInts
is how users will perform efficient queries on this database:
findInts:{[t;s;e] exec distinct part from intLookup[t] where lim within (s;e)}
Copy
q)select from quote where
int in findInts[`quote;2020.06.28D17:15:54.75;2020.06.28D17:15:54.77],
time within 2020.06.28D17:15:54.75 2020.06.28D17:15:54.77
int sym time bid ask bsize asize
----------------------------------------------------------------------
0 baf 2020.06.28D17:15:54.751561000 0.3867353 0.3869818 5 7
0 baf 2020.06.28D17:15:54.751561000 0.726781 0.6324114 2 8
1 baf 2020.06.28D17:15:54.762522000 0.3867353 0.3869818 5 7
1 baf 2020.06.28D17:15:54.762522000 0.726781 0.6324114 2 8
1 igf 2020.06.28D17:15:54.762522000 0.9877844 0.7750292 9 4
The full extent of the changes are best explored by reviewing the git commit.
Alternate methods to control when to partition
Rather than polling the file system to use as a metric to trigger the creation of a new partition other methods could be chosen. Methods can be basic, needing some human tuning of limits to be useful, or exact (even for dynamic incoming data) but possibly computationally expensive.
One choice would be a basic count of cumulative rows across all incoming table data and trigger at a pre-set limit. However, the resulting size of data could vary wildly depending on the number of columns in the tables.
For a slightly more dynamic/accurate method one could could use a lookup dictionary of the size in bytes of each datatype:
typeSizes:(`short$neg (1+til 19) except 3)!1 16 1 2 4 8 4 8 1 8 8 4 4 8 8 4 4 4
calcSize:{sum count[x]*typeSizes type each value first x}
Copy
To test we replay a 5121KB tickerplant transaction log using -11!:
q)quote:([]time:`timestamp$();sym:`symbol$();bid:`float$();ask:`float$();bsize:`int$();asize:`int$())
q)trade:([]time:`timestamp$();sym:`symbol$();price:`float$();size:`int$())
q)upd:insert
q)-11!`sym_0
12664
q)div[;1024] sum calcSize each (trade;quote)
q)4204
Copy
The resulting estimate is 4204KB. Comparing this to the size of the same data as stored on disk (uncompressed) results in a similar 4244KB:
$du -s HDB/0
4244 HDB/0
Copy
The main flaw with the calcSize
function is its inability to calculate the size of data in array columns, such as the string type. It could be extended to account for this but then it’s complexity and run time would increase as it would need to integrate each cell rather than using only the first row as it does in its basic form.
Kdb+ itself provides a shortcut to calculate the IPC serialized size of an object with -22!:
q)div[;1024] sum -22!/:(trade;quote)
3710
Copy
While optimized for speed -22!
remains an expensive operation. It also gives inaccurate results for symbol type data as in memory they are interned for efficiency but during IPC transfer use varying space depending on their length.
In common with calcSize
both these methods also suffer from being unable to account for the memory overheads associated with any columns which have attributes applied to them.
In the process itself .Q.w can be interrogated to view actual memory reserved in the heap and used by objects:
q)div[;1024] .Q.w[]`heap`used
65536 4702
Copy
Whilst .Q.w
in the RDB may seem like a good way to trigger in practice having the tickerplant poll another process is not a good idea as it is designed to be a self-contained process which will reliably store the transaction log and never be able to be blocked a downstream process, it publishes data asynchronously for this reason.
Overall this is an area where the “keep it simple, stupid” principle applies. There is little benefit to attempting to be too exact. Choosing a simple method and allowing a cautious RAM overhead for any inaccuracy is the best path to follow.
Handling late data
Filter time buffer
The hour
helper is exact, this may be to exact for some use cases. For example a table with multiple timestamp columns which are created as the data flows through various processes. These other timestamp columns will be slightly behind the final timestamp created in the tickerplant.
Note: This issue is not limited to int
partitioning and can be beneficial in any partitioned database.
If a user queries without accounting for this they could be presented with incomplete results:
select from trade where
int within hour 2020.06.26D0 2020.06.26D07,
otherTimeCol within 2020.06.26D0 2020.06.26D07
Copy
This can be manually accounted for by adding a buffer to the end value of your time window. Here one second is used:
select from trade where
int within hour 0D 0D00:01+2020.06.26D0 2020.06.26D07,
otherTimeCol within 2020.06.26D0 2020.06.26D07
Copy
Better still would be to wrap this in a small utility for ease of use:
buffInts:{hour 0D 0D00:01+x}
select from trade where int within buffInts 2020.06.26D0 2020.06.26D07,
otherTimeCol=2020.06.26D0 2020.06.26D07
Copy
Extended lookup table
Choosing a buffer value is an inexact science. A more efficient solution is to use a lookup
table, this will allow for fast queries in both the hourly and fixed size partition examples. The table can be extended to include any extra columns as needed. .u.addLookup
is edited to gather stats on the extra columns as needed:
.u.addLookup:{
`:lookup/ upsert .Q.en[`:.] raze {select part:enlist x,tab:enlist y,
minTS:min time,maxTS:max time,
minOtherCol:min otherCol,maxOtherCol:max otherCol,
from y}[x] each tables[]
};
Copy
q)lookup
part tab minTS maxTS minOtherCol maxOtherCol
--------------------------------------------
cacheLookup
behavior and the intLookup
it creates are now also changed:
cacheLookup:{
if[`lookup in tables[];
intLookup::`lim xasc ungroup select column:`time`time`otherCol`otherCol,
lim:(minTS,maxTS,minOtherCol,maxOtherCol) by part,tab from lookup;
};
Copy
findInts:{[t;c;s;e]
exec distinct part from intLookup where tab=t,column=c,lim within (s;e)
}
Copy
The user would then pass in an extra parameter to findInts
to specify which column to use when choosing int partitions:
findInts[`quote;`otherCol;2020.06.28D16;2020.06.28D17]
Copy
These lookup tables are very powerful. Not only in these cases where data is slightly delayed but in fact any delay can now be handled gracefully, even if data is months late the lookup table protects against expensive full database scans or users missing data by making their queries too restrictive in their lookup of partitions assuming a certain maximum ‘lateness’ of data.
Reducing number of files
One side effect of int partitioning is a larger number of files being created on disk. At query time this can result in slower response time if many partitions need to be opened and scanned. Errors can also occur if the process ulimit is breached. At an extreme, the file system may run out of inode allocation space. Choosing how often a partition is created is one way to prevent too many files. Another is to implement a defrag
process which will join several partitions together.
This process is started and passed the ports for the RDB and HDB:
q defrag.q -s 4 ::5011 ::5012
Copy
-s 4
is passed to create secondary threads so multiple cores are used to speed up the task.
The defrag
function is then available which takes the following parameters:
hdb
– hsym to root of HDBsrc
– int list of partitions to combinedst
– int destination partitioncomp
– compression settingsp
– symbol column name to apply parted attribute ontyp
– symbolhourly
orfixed
to specify the type of HDB
It’s source is viewable in defrag.q
Reducing hourly partitions
defrag[`:hourly/HDB;179608 179609;179608;17 2 6;`sym;`hourly]
Copy
For data to remain queryable in a performant manner there are some requirements:
- Partitions being joined must be contiguous
- The destination must be the minimum partition of the source list
These requirements are related to how the previously used hour
function will be replaced. Now that the partitions are combined it will not function correctly:
q)select from trade where int in hour 2020.06.27D17,
time within 2020.06.27D17 2020.06.27D18
int sym time price size
-----------------------
Copy
This is due to the function expecting the data to be in partition 179609
which no longer exists as it has been merged in to 179608
.
To solve this we can make use of the bin function. It returns the prevailing bucket a value falls in to:
q)list:0 2 4
q)list bin 0 1 3 5
0 0 1 2
q)list list bin 0 1 3 5
0 0 2 4
Copy
When kdb+ loads a partitioned database it creates a global variable which contains the list of all partitions. For date partitioned it is name date
and for int int
etc. This list can then be used to extend our hour
function with bin
to find the correct bucket:
q)hour:{int int bin `int$sum 24 1*`date`hh$\:x}
Copy
Our combined hours now correctly return that they both reside within a single partition:
q)hour 2020.06.27D16
179608
q)hour 2020.06.27D17
179608
Copy
The previously failing query now succeeds:
q)select from trade where int in hour 2020.06.27D17,
time within 2020.06.27D17 2020.06.27D18
int sym time price size
------------------------------------------------------
179608 baf 2020.06.27D17:00:00.000000000 0.949975 1
179608 baf 2020.06.27D17:00:00.050000000 0.391543 2
Reducing fixed partitions
defrag[`:fixed/HDB;0 1 2 3 4;0;17 2 6;`sym;`fixed]
Copy
The requirements about how partitions are combined which applied to hourly database do not apply to the fixed database. This is because the lookup
table exists.
After running defrag
no changes are needed in the helper function findInts
. Instead during defrag
the lookup table is updated with the latest information regarding partitions. To ensure two processes do not try to write to lookup
simultaneously the RDB is contacted to perform this step.
The logic for this is best explored by viewing the reloadFixed function defined in defrag.q
.
This blog has shown the flexibility int partitioning delivers by working through 2 examples. There are many other possible ways to exploit this feature. Some links to other examples are provided below.
- https://code.kx.com/q/wp/intraday-writedown/
- https://www.aquaq.co.uk/q/kdb-iot/
- https://groups.google.com/g/personal-kdbplus/c/oe4UANpB1rM
Full source code of this blog is available on Github.
Sponsored by Kx