Writing new Stages¶
So you want to write your own Stages! This tutorial will walk you through the steps for building the foundations of your stage, building a simple pipeline, and unit-testing your stage.
Writing a Consumer Stage¶
Stages can have producer components and/or consumer components. To “consume” means that the stage reads frames from a Kotekan Buffer . To “produce” means it writes frames into a Kotekan buffer. This tutorial will illustrate writing and testing a stage that only has consumer components.
The steps we will follow for developing the stage are:
Load the classes we will be using.
Register the stage with abstract factory.
Write a skeleton constructor. Within the constructor:
Register the stage as a consumer of in_buf.
Write the skeleton for the framework managed pthread. Within this main thread:
Declare the pointer to the buffer.
Acquire the frame.
Handle what happens if a null frame is returned.
Release the frame.
Increase the ring pointer.
Create the header.
1#include "ExampleConsumer.hpp"
2
3#include "StageFactory.hpp" // for REGISTER_KOTEKAN_STAGE
4#include "kotekanLogging.hpp" // for INFO
5#include "visUtil.hpp" // for frameID, modulo
6
7#include "fmt.hpp" // for compile_string_to_view
8
9#include <functional> // for bind, function
10#include <stdint.h> // for uint32_t, uint8_t
11
12// Include the classes we will be using
13using kotekan::bufferContainer;
14using kotekan::Config;
15using kotekan::Stage;
16
17// Register the stage with the stage factory.
18REGISTER_KOTEKAN_STAGE(ExampleConsumer);
19
20/*
21 * ExampleConsumer constructor. Note that you can instead use the macro
22 *
23 * STAGE_CONSTRUCTOR(ExampleConsumer)
24 *
25 * which saves the boilerplate of the constructor signature.
26 */
27ExampleConsumer::ExampleConsumer(Config& config, const std::string& unique_name,
28 bufferContainer& buffer_container) :
29 Stage(config, unique_name, buffer_container, std::bind(&ExampleConsumer::main_thread, this)) {
30
31 // Register as consumer of in_buf
32 in_buf = get_buffer("in_buf");
33 in_buf->register_consumer(unique_name);
34}
35
36
37ExampleConsumer::~ExampleConsumer() {}
38
39// Framework managed pthread
40void ExampleConsumer::main_thread() {
41 // Logging function
42 INFO("Reached main_thread!");
43
44 // Ring buffer pointer
45 frameID frame_id(in_buf);
46
47 // Get the no. of elements in each frame
48 uint32_t frame_length = in_buf->frame_size / sizeof(float);
49
50 // Until the thread is stopped
51 while (!stop_thread) {
52
53 // Acquire frame
54 uint8_t* frame = in_buf->wait_for_full_frame(unique_name, frame_id);
55 // A null frame is returned on shutdown
56 if (frame == nullptr)
57 break;
58
59 float* data = (float*)frame;
60
61 // Logging
62 INFO("{:s}[{:d}]: {:f}, ..., {:f}, ..., {:f}", in_buf->buffer_name, frame_id, data[0],
63 data[frame_length / 2], data[frame_length - 1]);
64
65 // Release frame
66 in_buf->mark_frame_empty(unique_name, frame_id);
67
68 // Increase the ring pointer
69 frame_id++;
70 }
71}
Now, let us create the header file.
1#ifndef EXAMPLE_CONSUMER_H
2#define EXAMPLE_CONSUMER_H
3
4#include "Config.hpp" // for Config
5#include "Stage.hpp" // for Stage
6#include "buffer.hpp" // for Buffer
7#include "bufferContainer.hpp" // for bufferContainer
8
9#include <string> // for string
10
11/**
12 * @class ExampleConsumer
13 * @brief An example consumer stage to print the contents of a buffer.
14 *
15 * @par Buffers
16 * @buffer in_buf The buffer to process the contents of.
17 * @buffer_format any
18 * @buffer_metadata any
19 *
20 */
21class ExampleConsumer : public kotekan::Stage {
22public:
23 /**
24 * @brief Constructor for the stage
25 * Note: you can use the macro STAGE_CONSTRUCTOR(ExampleConsumer)
26 * if your constructor does not need additional customisation
27 * and you wish to hide the complexity.
28 */
29 ExampleConsumer(kotekan::Config& config, const std::string& unique_name,
30 kotekan::bufferContainer& buffer_container);
31
32 /**
33 * @brief Deconstructor - what happens when Kotekan shuts down.
34 */
35 virtual ~ExampleConsumer();
36
37 /**
38 * @brief Framework managed pthread.
39 */
40 void main_thread() override;
41
42private:
43 // Input buffer
44 Buffer* in_buf;
45};
46
47#endif /* EXAMPLE_CONSUMER_H */
To let the compiler know about the stage, add it to lib/stages/CMakeLists.txt.
The compiling instructions can then be found at Compiling Kotekan .
Writing a Producer Stage¶
A Kotekan “producer” writes data into a Kotekan Buffer . This page demonstrates how to write a stage that only produces data.
The steps are:
Load the classes we will be using.
Register the stage with abstract factory.
Write a skeleton constructor. Within the constructor:
Register the stage as a producer of in_buf.
Load some configuration options.
Write the skeleton for the framework managed pthread. Within this main thread:
Declare the pointer to the buffer.
Acquire the frame.
Handle what happens if a null frame is returned.
Release the frame.
Increase the ring pointer.
Create the header.
1#include "ExampleProducer.hpp"
2
3#include <stdint.h> // for uint32_t, uint8_t
4#include <functional> // for bind, function
5
6#include "StageFactory.hpp" // for REGISTER_KOTEKAN_STAGE
7#include "kotekanLogging.hpp" // for INFO
8#include "visUtil.hpp" // for frameID, modulo
9#include "fmt.hpp" // for compile_string_to_view
10
11// Include the classes we will be using
12using kotekan::bufferContainer;
13using kotekan::Config;
14using kotekan::Stage;
15
16// Register the stage with the stage factory.
17REGISTER_KOTEKAN_STAGE(ExampleProducer);
18
19ExampleProducer::ExampleProducer(Config& config, const std::string& unique_name,
20 bufferContainer& buffer_container) :
21 Stage(config, unique_name, buffer_container, std::bind(&ExampleProducer::main_thread, this)) {
22
23 // Register as producer of out_buf
24 out_buf = get_buffer("out_buf");
25 out_buf->register_producer(unique_name);
26
27 // Load options that can be set in config
28 // The arguments to config.get_default are the:
29 // unique_name_for_stage, name_of_config, default_value_if_not_set
30 _init_value = config.get_default<float>(unique_name, "init_value", 0.f);
31}
32
33
34ExampleProducer::~ExampleProducer() {}
35
36// Framework managed pthread
37void ExampleProducer::main_thread() {
38
39 // Ring buffer pointer
40 frameID frame_id(out_buf);
41
42 // Get the no. of elements in each frame
43 uint32_t frame_length = out_buf->frame_size / sizeof(float);
44
45 // Until the thread is stopped
46 while (!stop_thread) {
47
48 // Acquire frame
49 uint8_t* frame = out_buf->wait_for_empty_frame(unique_name, frame_id);
50 // A null frame is returned on shutdown
51 if (frame == nullptr)
52 break;
53
54 float* data = (float*)frame;
55
56 for (uint32_t i = 0; i < frame_length; i++) {
57 data[i] = _init_value;
58 }
59
60 INFO("{:s}[{:d}] initialised to: {:f}, ..., {:f}, ..., {:f}", out_buf->buffer_name,
61 frame_id, data[0], data[frame_length / 2], data[frame_length - 1]);
62
63 // Release frame
64 out_buf->mark_frame_full(unique_name, frame_id);
65
66 // Increase the ring pointer
67 frame_id++;
68 }
69}
Now, let us create the header file.
1#ifndef EXAMPLE_PRODUCER_H
2#define EXAMPLE_PRODUCER_H
3#include "Config.hpp" // for Config
4#include "Stage.hpp"
5#include "buffer.hpp"
6#include "bufferContainer.hpp" // for bufferContainer
7
8#include <string> // for string
9
10/**
11 * @class ExampleProducer
12 * @brief An example producer stage that sets each element of a buffer to a constant value.
13 *
14 * @par Buffers
15 * @buffer out_buf The buffer to process the contents of.
16 * @buffer_format any
17 * @buffer_metadata any
18 *
19 * @conf init_value Default 0. The value to set each element to.
20 */
21class ExampleProducer : public kotekan::Stage {
22public:
23 /**
24 * @brief Constructor for the stage
25 * Note: you can use the macro STAGE_CONSTRUCTOR(ExampleProducer)
26 * if your constructor does not need additional customisation
27 * and you wish to hide the complexity.
28 */
29 ExampleProducer(kotekan::Config& config, const std::string& unique_name,
30 kotekan::bufferContainer& buffer_container);
31
32 /**
33 * @brief Deconstructor - Called on shutdown, after @c main_thread has exited.
34 */
35 virtual ~ExampleProducer();
36
37 /**
38 * @brief Framework managed pthread.
39 */
40 void main_thread() override;
41
42private:
43 // Output buffer
44 Buffer* out_buf;
45
46 // Initialised value
47 float _init_value;
48};
49
50#endif /* EXAMPLE_PRODUCER_H */
To let the compiler know about the stage, add it to lib/stages/CMakeLists.txt.
Writing a Producer/Consumer Stage¶
Next, we will create a kotekan Stage that consumes two input buffers, i.e. vector A and B, and computes their dot product. It will write the (elementwise) dot product to an output buffer.
The header and source files (DotProduct.hpp/cpp) are shown below.
1#ifndef EXAMPLE_DOT_PRODUCT_STAGE_H
2#define EXAMPLE_DOT_PRODUCT_STAGE_H
3
4#include "Config.hpp" // for Config
5#include "Stage.hpp" // for Stage
6#include "buffer.hpp" // for Buffer
7#include "bufferContainer.hpp" // for bufferContainer
8
9#include <string> // for string
10
11/**
12 * @class ExampleDotProduct
13 * @brief A stage to compute the dot product between two vectors: A and B, which are represented by
14 * two buffers. The result is written to an output buffer.
15 *
16 * @par Buffers
17 * @buffer in_a_buf The input buffer representing vector A.
18 * @buffer_format Array of floats
19 * @buffer_metadata none
20 * @buffer in_b_buf The input buffer representing vector B.
21 * @buffer_format Array of floats
22 * @buffer_metadata any
23 * @buffer out_buf The output buffer to hold the result.
24 * @buffer_format Array of floats
25 * @buffer_metadata any
26 */
27class ExampleDotProduct : public kotekan::Stage {
28public:
29 ExampleDotProduct(kotekan::Config& config, const std::string& unique_name,
30 kotekan::bufferContainer& buffer_container);
31 virtual ~ExampleDotProduct();
32 void main_thread() override;
33
34private:
35 // Input buffers
36 Buffer* in_a_buf;
37 Buffer* in_b_buf;
38
39 // Output buffer
40 Buffer* out_buf;
41};
42
43#endif /* EXAMPLE_DOT_PRODUCT_STAGE_H */
1#include "ExampleDotProduct.hpp"
2
3#include <stdint.h> // for uint8_t, uint32_t
4#include <stdexcept> // for runtime_error
5
6#include "StageFactory.hpp" // for REGISTER_KOTEKAN_STAGE
7#include "kotekanLogging.hpp" // for INFO
8#include "visUtil.hpp" // for frameID, modulo
9#include "fmt.hpp" // for compile_string_to_view, format, fmt
10
11// Include the classes we will be using
12using kotekan::bufferContainer;
13using kotekan::Config;
14using kotekan::Stage;
15
16// Register the stage with the stage factory.
17REGISTER_KOTEKAN_STAGE(ExampleDotProduct);
18
19STAGE_CONSTRUCTOR(ExampleDotProduct) {
20
21 // Register as consumer of in_a_buf and in_b_buf
22 in_a_buf = get_buffer("in_a_buf");
23 in_a_buf->register_consumer(unique_name);
24
25 in_b_buf = get_buffer("in_b_buf");
26 in_b_buf->register_consumer(unique_name);
27
28 // Register as a producer of out_buf
29 out_buf = get_buffer("out_buf");
30 out_buf->register_producer(unique_name);
31
32 // Ensure the input buffers are the same length
33 if (in_a_buf->frame_size != in_b_buf->frame_size) {
34 throw std::runtime_error(
35 fmt::format(fmt("in_a_buf frame size does not match in_b_buf frame size. {:d} != {:d}"),
36 in_a_buf->frame_size, in_b_buf->frame_size));
37 }
38
39 // Ensure the output buffer length matches the input buffer lengths
40 if (in_a_buf->frame_size != out_buf->frame_size) {
41 throw std::runtime_error(
42 fmt::format(fmt("Input frame size does not match output frame size. {:d} != {:d}"),
43 in_a_buf->frame_size, out_buf->frame_size));
44 }
45}
46
47ExampleDotProduct::~ExampleDotProduct() {}
48
49void ExampleDotProduct::main_thread() {
50
51 INFO("Starting main_thread!");
52
53 // Buffer indices
54 frameID in_a_frame_id(in_a_buf);
55 frameID in_b_frame_id(in_b_buf);
56 frameID out_frame_id(out_buf);
57
58 // Length of vectors
59 uint32_t frame_length = in_a_buf->frame_size / sizeof(float);
60
61 // Until the thread is stopped
62 while (!stop_thread) {
63
64 // Acquire input frames
65 uint8_t* frame_a_ptr = in_a_buf->wait_for_full_frame(unique_name, in_a_frame_id);
66 // A null frame is returned on shutdown
67 if (frame_a_ptr == nullptr)
68 break;
69 uint8_t* frame_b_ptr = in_b_buf->wait_for_full_frame(unique_name, in_b_frame_id);
70 if (frame_b_ptr == nullptr)
71 break;
72
73 // Wait for new output buffer
74 uint8_t* out_frame_ptr = out_buf->wait_for_empty_frame(unique_name, out_frame_id);
75 if (out_frame_ptr == nullptr)
76 break;
77
78 // Cast pointers to float arrays
79 float* a = (float*)frame_a_ptr;
80 float* b = (float*)frame_b_ptr;
81 float* output = (float*)out_frame_ptr;
82
83 // Perform dot product
84 for (uint32_t i = 0; i < frame_length; i++) {
85 output[i] = a[i] * b[i];
86 }
87
88 // Release the input frames and increment the frame indices
89 in_a_buf->mark_frame_empty(unique_name, in_a_frame_id++);
90 in_b_buf->mark_frame_empty(unique_name, in_b_frame_id++);
91
92 // Release the output frame and increment the output frame index
93 out_buf->mark_frame_full(unique_name, out_frame_id++);
94 }
95}
Creating a Pipeline¶
See the User Guide section <user_pipeline_example> on creating a pipeline, which uses these example producer, consumer, and dot-product (producer/consumer) stages.