Vertica: The Kafka Streaming Load Scheduler
by Tom Wall.
Vertica’s streaming load scheduler provides high-performance streaming data load from Kafka into your Vertica database. Whether you already use Kafka or not, it is worth considering it as a solution to your data loading challenges. Kafka complements Vertica very nicely, and the scheduler removes many complexities of designing a robust data loading process that you would otherwise have to build into your own ETL or application logic.
Most Vertica loading mechanisms have a batch-oriented workflow, where you use the COPY statement to load a batch of data into a single table. In this world, applications have to manage batches carefully to ensure that data arrives in Vertica in a consistent state with good performance.
The Kafka load scheduler uses a higher-level streaming abstraction. With this model, data is continuously generated, and we want to load that unbounded data set into Vertica as it arrives. Rather than managing batches and issuing COPY statements directly, you configure the scheduler to load Kafka topics into Vertica tables, and the scheduler manages the load process by intelligently dispatching COPY statements in small batches on your behalf.
The scheduler is a java-based daemon that dispatches commands to your target database via JDBC. It leverages many of Vertica’s load and workload management capabilities to provide a robust loading experience.
An important design point of the scheduler is that it keeps all configuration and runtime states in the database alongside the data you are loading. By keeping everything centralized, it makes it possible to load from data in a fault-tolerant, exactly-once fashion via Vertica transactions.
Since the scheduler consumes some database resources, you naturally have to provide some information to tell the scheduler how to do so. The scheduler needs a connection string in order to identify and connect to a database. Once connected, it needs to be given a config schema, a database schema where its configuration and runtime state is kept, and which resource pool it should use when performing the load.
The scheduler implements the streaming abstraction by dispatching units of work calledmicrobatches. Each microbatch is an atomic load and progress update – a COPY statement loads some data, and an insert into the runtime state table records which offsets were successfully loaded from Kafka. Because this operation is atomic, the process remains in a consistent state even if Kafka or Vertica nodes go down during the load. Failed requests are automatically retried, and provided that the data is still available in Kafka when the issue is corrected, the load process can resume wherever it left off.
When streaming, there is a tradeoff to be made between latency and throughput. You want to commit new loads as quickly as possible so that data can be available for query, but if you make the window too small, you end up spending more time on setup and overhead than you do actually moving data. The scheduler takes a time-based approach to batching. The infinite stream of time is broken intoframes of a specified length of time. This latency-throughput tradeoff is controlled by the scheduler’sframe duration parameter. This duration is also the basis for the scheduling heuristic when the scheduler manages multiple streams.
Combining it all together, the scheduler follows a relatively simple process:
1. Upon startup, configuration is read from the config schema.
2. The scheduler inspects the resource pool and the workload to determine how much work must be done in the configured frame duration
3. Each frame queries the runtime state table to determine the progress made by the microbatches in the previous frame. Then, for each microbatch:
a. Begin a transaction
b. Start a time-based load from Kafka starting at the ending offset from the previous frame
c. Record the progress made in the runtime table
d. Commit the transaction
In future posts, we will go into detail about how the scheduler manages resources and computes this workload.
Using the Scheduler
The streaming load scheduler comes pre-packaged and installed with the Vertica rpm. You can use the scheduler by running the vkconfig script:
This script takes several sub-commands used to configure, start and stop the scheduler.
Scheduler – creates and maintains the global scheduler configuration
Cluster – manages the connection settings for the Kafka cluster(s) being loaded
Source – informs the scheduler about the Kafka topics being loaded
Target – informs the scheduler about the Vertica tables being loaded into
Load-spec – manages the COPY options used by the scheduler when it dispatches loads
Microbatch – combines the configuration a cluster, source, target & load spec to define a unit of work to be done during each frame.
Launch – starts the scheduler process
Shutdown – kills the scheduler process
All of the configuration specified in these commands is persisted in the scheduler’s config schema. By keeping this configuration in the database, it makes the scheduler process largely stateless and makes it easy to read the configuration via SQL. You can read about these tables here.
Suppose you have a mobile game that recently exploded in popularity. Congratulations! Users are constantly signing up to use your service, and so you must design a loading pipeline that can handle 24×7 loads with bursts in activity.
Maybe the devices are submitting registration via HTTP POST containing a JSON body. The web server receives those requests, validates them and then and produces them to a “users” topic. With a few quick steps, you can set up a scheduler to have that data automatically loaded into Vertica as it arrives in Kafka.
First, you’ll want to define where the data in Vertica will be stored. Since the mobile team is rapidly iterating on their application, the data format may vary between versions of the application that submit the registration, and you don’t want the load to break if the fields change.
We will use a flex table to accommodate the potential variance in the records that arrive:
vsql -c “CREATE FLEX TABLE public.dest();”
Next, you’ll want to define a resource pool to isolate the workload. By carving out a chunk of the system’s resources, other applications and users querying the system won’t be impacted by the load process. The scheduler is resource-pool aware, so it will do its best to maximize the utilization of those resources without oversubscribing. In this case, we assigned PLANNEDCONCURRENCY a value of 2, which allows two requests to run at once – one slot is for the scheduler’s internal operations, while the other one will perform loads. With an EXECUTIONPARALLELISM value of 3, each Vertica node will use up to 3 threads to perform the load in parallel. Ideally the total number of load threads across the Vertica cluster should equal the number of Kafka partitions, so that each can be read in parallel.
vsql -c "create resource pool sched_pool plannedconcurrency 2 executionparallelism 3"
Scheduler Global Configuration
With the database resources defined, you can now set up a scheduler to perform the load. Since the scheduler stores all of its configuration and runtime state in Vertica, it needs to be told how to reach Vertica. In particular, it needs a JDBC URL to reach the database, and then within the database, it needs to be told which schema holds the scheduler’s configuration tables.
Since these options are required in all CLI invocations, and since they don’t change frequently, it is often best to store them in a configuration file.
echo “jdbc-url=jdbc:vertica://localhost:5433/mydb” >> scheduler.properties
echo “config-schema=mysched” >> scheduler.properties
You can store any CLI option in the conf file. The scheduler will look for options in the conf file before processing the command-line arguments. This makes it useful for storing sensitive information like passwords, or to avoid a layer of shell quote processing when passing arguments that contain SQL.
With the static portions of the configuration defined, we can now define a scheduler. Here we create a scheduler that will load every 2 minutes using the resource pool we created earlier.
vkconfig scheduler --create –conf scheduler.properties --resource-pool sched_pool --frame-duration 00:02:00
Scheduler Workload Configuration
Now that the scheduler has been defined, we can define the workload. In the end we will create a microbatch – a load of a topic into a table that will run each frame – but in order to do so we must first define some prerequisites. The scheduler uses information provided in the next few steps to generate COPY statements to perform the load.
The first step is to define a cluster. This command informs the scheduler about the connection information for the Kafka cluster. The scheduler will query Kafka to discover any brokers that you don’t specify, so you can save some typing by only listing one or two here.
vkconfig cluster --create --cluster mycluster --hosts localhost:9092 --conf scheduler.properties
Next, you’ll create a source. This is used to specify a topic within a Kafka cluster. By default, the scheduler will automatically query the topic to ensure that it exists and assume that all partitions in the topic should be loaded.
vkconfig source --create --source mytopic --cluster mycluster –----conf scheduler.properties
Now that the topic can be identified, we must specify how Vertica should load the data. Below, we tell the scheduler about the flex table we created earlier.
vkconfig target --create --target-schema public --target-table dest --conf scheduler.properties
Next, we have to specify how the scheduler should load the data. A load-spec is a named collection of options that the scheduler uses when it generates a COPY statement to do the load. Since the data is JSON, we will use the KafkaJSONParser. Since we expect there to be a lot of data, we will load DIRECT.
vkconfig load-spec --create --load-spec jsondirect --parser KafkaJSONParser --load-method DIRECT --conf scheduler.properties
Finally, with all of the prerequisite information defined, we can create a microbatch.
vkconfig microbatch --create --microbatch mybatch --target-schema public --target-table dest --load-spec jsondirect --conf scheduler.properties
To run the scheduler, use the launch command. The launch command runs in the foreground, and will only stop when the process receives a termination signal. You can script around the launch command to run it on demand, or perhaps leave it running continuously in the background.
vkconfig launch &
While the scheduler runs, data will be continuously loaded into the target table. You can monitor the progress of the load by querying the stream_microbatch_history runtime state table, or by monitoring the loads in Management Console.
While this example only loaded one topic with one microbatch, the scheduler allows you to add more loads to the workload. In future posts, we will discuss how the scheduler actually schedules these tasks, how you can monitor them, and how you can tune things to get the best possible performance.