Process Kafka Events Using MATLAB
This example shows how to use the Streaming Data Framework for MATLAB® Production Server™ to process events from a Kafka® stream. The example provides and explains the
initRecamanSum streaming analytic functions that process event streams,
demoRecaman script that creates event streams, validates event
stream creation, uses the streaming analytic function to process event streams, and writes the
results to an output stream.
The example functions and script are located in the
is the root
folder of support packages on your system. To get the path to this folder, use this
You must have Streaming Data Framework for MATLAB Production Server installed on your system. For more information, see Install Streaming Data Framework for MATLAB Production Server.
You must have a running Kafka server where you have the necessary permissions to create topics. The example assumes that the network address of your Kafka host is
Write Streaming Analytic MATLAB Function
For this example, use the sample MATLAB functions
Later, you iterate the
recamanSum streaming function over several events
to compute results.
Write Stateful Function
recamanSum function is stateful. In
stateful functions, the data state is shared between events, and past events can influence
the way current events are processed.
recamanSum computes the
cumulative sum of a numeric sequence in stream variable
R, and returns
cSum and structure
state. The table
cSum contains the cumulative sum of the elements in
R along with timestamps. The structure
contains the final value of the sequence in its field
function [cSum, state] = recamanSum(data, state) timestamp = data.Properties.RowTimes; key = data.key; sum = cumsum(data.R) + state.cumsum; state.cumsum = sum(end); cSum = timetable(timestamp, key, sum); end
Write State Initialization Function
initRecamanSum function initializes state for the first
iteration of the
function state = initRecamanSum(config) state.cumsum = 0; end
Create Sample Stream Events
To run the example, you require sample streaming data. The
demoRecaman script contains the following code to create streaming data
that consists of the first 1000 elements of Recamán's sequence and also contains code to
write the sequence to a Kafka topic
Set the Kafka hostname and port number.
kafkaHost = "kafka.host.com"; kafkaPort = 9092;
Create the first 1000 elements of Recamán's sequence.
To create the sequence, you can use the following
recamanTimeTablefunction also located in the
recamanTimeTablecreates a timetable containing the first
Nelements of Recamán's sequence.
function tt = recamanTimeTable(N) rs = zeros(1,N); for k=2:N n = k-1; subtract = rs(k-1) - n; if subtract > 0 && any(rs == subtract) == false rs(k) = subtract; else rs(k) = rs(k-1) + n; end end incr = seconds(1:N); thisVeryInstant = ... convertTo(datetime, "epochtime", "Epoch", "1970-1-1"); thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",... "epochtime", "Epoch", "1970-1-1"); thisVeryInstant.TimeZone = "UTC"; timestamp = (thisVeryInstant - seconds(N)) + incr'; key = (0:N-1)'; key = string(key); R = rs'; tt = timetable(timestamp,R,key); end
Store the results of
recamanTimeTablein a timetable.
tt0 = recamanTimeTable(1000);
Create a stream object connected to the
recamanSum_datatopic. Later, you write the timetable that contains the Recamán sequence to
dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);
recamanSum_datatopic already exists, delete it.
try deleteTopic(dataKS); catch, end
Write the entire Recamán sequence to the
Validate Sample Data Creation
To validate the sample stream events that you created, confirm that the first 100 rows
that you read from the
recamanSum_data topic are the same as the sample
data you created and wrote to the
recamanSum_data topic. The
demoRecaman script contains the following code.
Read one window of data (100 rows) from the
recamanSum_datatopic into a timetable
tt1 = readtimetable(dataKS);
Check if the data read into
tt1is equal to the first 100 elements from the Recamán sequence you wrote.
if isequal(tt0(1:height(tt1),:), tt1) fprintf(1,"Success writing data to topic %s.\n", dataKS.Name); end
Stop reading from the
dataKSstream, since later you use
dataKSto read again from the
recamanSum_datatopic. Reading from the same topic using multiple streams is not permitted.
Process Stream Events with Streaming Analytic Function
recamanSum streaming analytic function multiple times to
read the numeric sequence from the input stream, compute its cumulative sum, and write the
results to the output stream. The
demoRecaman script contains the
Create an output stream connected to the
recamanSum_resultsto store the output of the
resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ... Rows=100);
Create an event stream processor to iterate the
recamanSumstreaming function over the input topic connected to the stream
dataKS. Write the results to the output topic connected to the stream
resultKS. Use a persistent storage connection named
RRto store data state between iterations.
rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,... StateStore="RR",OutputStream=resultKS);
Execute the stream function ten times. Since the window size, or the number of rows read at a time, is 100, ten iterations consumes the entire sequence of 1000 elements.
fprintf(1,"Computing cumulative sum of Recaman sequence.\n"); execute(rsp, 10);
Delete the event stream processor. This shuts down
StateStore, which is required to run this script more than once in a row.
Read the results from the output stream.
fprintf(1,"Reading results from %s.\n", resultKS.Name); tt2 = timetable.empty; for n = 1:10 tt2 = [ tt2 ; readtimetable(resultKS) ]; end cSum = cumsum(tt0.R); if tt2(end,:).sum == cSum(end) fprintf(1,"Cumulative sum computed successfully: %d.\n", ... tt2(end,:).sum); else fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ... cSum(end), tt2(end,:).sum); end
When you run the entire
demoRecaman script, you see the following
Success writing data to topic recamanSum_data. Computing cumulative sum of Recaman sequence. Reading results from recamanSum_results. Cumulative sum computed successfully: 837722.