Event Stream Slicing
Header <dv-sdk/processing/core.hpp>
Event Stream Slicing allows the input data to be cut in packets of customizable length, either in time or in number of events.
The run
function of a DV module gets called whenever there is a new data packet available at an input. The size of these packets is determined by the upstream module that generates the data. However, often a module needs to control the amount of data to be processed at a time.
Example Setup
The following example shows a minimal example Module that takes in events and slices the event to 100ms as well as 100 event chunks.
#include <dv-sdk/module.hpp>
#include <dv-sdk/processing/core.hpp>
class MyModule : public dv::ModuleBase {
private:
dv::EventStreamSlicer slicer;
public:
static const char *initDescription() {
return "Slicing demo";
}
static void initInputs(dv::InputDefinitionList &in) {
in.addEventInput("events");
}
static void initConfigOptions(dv::RuntimeConfig &configuration) {}
void doEvery100ms(const dv::EventStore& data) {
// Gets executed on every 100ms of event data
// with the event data in `data`
}
void doEvery100Events(consy dv::EventStore& data) {
// Gets executed every 100 events
// with the events in `data`
}
MyModule() {
// assign jobs to slicer
slicer.doEveryTimeInterval(100000, [this](const EventStore& data){
doEvery100ms(data);
});
slicer.doEveryNumberOfEvents(100, [this](const EventStore& data){
doEvery100Events(data);
});
}
void run() override {
slicer.accept(inputs.getEventInput("events").events());
}
}
registerModuleClass(MyModule);
Description
An EventStreamSlicer
is added as a member variable to the module class. Whenever new data comes in (the run
function is called), the data gets added to the slicer. The slicer retains accumulates data to be able to slice the data into correctly sized chunks and invoke the callback functions with the data.
The slicer retains data until it was able to apply it to all callback functions. It is guaranteed that every event gets submitted to every callback function exactly once. The slicer does the internal book keeping to figure out when it is safe to release data.
Adding jobs
Jobs can be added to the slicer at any time. In most cases, one would want to add the jobs to the slicer in the module constructor.
To add a job with fixed-time window chunks
int jobId = slicer.doEveryTimeInterval(timeIntervalInUs, callback);
To add a job with a fixed number of events
int jobId = slicer.doEveryNumberOfEvents(numberOfEvents, callback);
The callback can be of any callable type, e.g. lambda, std::function
, bind expression, functor object, function pointer etc. It should satisfy the footprint void(const dv::EventStore &)
, e.g. return nothing and take a const reference to an Event Store.
Both functions return an integer with a job id. This can be discarded. The job id allows to later modify the job parameters or remove the job again.
Modifying jobs
To modify the time interval of a time based job
slicer.modifyTimeInterval(jobId, newTimeInterval);
To modify the number of events interval of a numbers of events based job
slicer.modifyNumberInterval(jobId, newNumberInterval);
Remove jobs
To remove a job
slicer.removeJob(jobId);