Imagine we have the following JSON message coming in with the following format:
Message Key: “vehicle-mclaren-mp412c”
Message Value:
{
"GPS": [{
"Speed": 89.3,
"SpeedUnit": "KMH",
"LAT": 45.2423434,
"LONG" : 12.12121
}],
"Driver": "Alonso"
}
We want to achieve following:
{
"GPS" : {
"Speed": 50.4,
"SpeedUnit" : "MPH",
"LAT": 45.2423434,
"LONG" : 12.12121
},
"Driver": "Alonso",
"Speeding" : True
}
using QuixStreams.Streaming;
using QuixStreams.Streaming.Models;
// Create a client which holds generic details for creating input and output topics
var client = new QuixStreamingClient();
var inputTopicName = Environment.GetEnvironmentVariable("input");
var outputTopicName = Environment.GetEnvironmentVariable("output");
using var producer = client.GetTopicProducer(outputTopicName);
using var consumer = client.GetTopicConsumer(inputTopicName);
async Task FilterLowSpeed(stream: Stream)
{
Console.WriteLine("New car: " + stream.StreamKey);
async for (row in stream.Timeseries)
{
var speed = row["GPS.Speed"].Value<double>;
var unit = row["GPS.SpeedUnit"].Value<string>;
row["GPS.Speed"] = unit == "kmh" ? speed / 1.6 : speed;
row["GPS.SpeedUnit"] = "MPH";
row["Speeding"] = True ? speed > 50 : False;
yield return row;
}
}
consumer
.GroupBy("car_id")
.FilterStream(FilterLowSpeed)
.sink(producer)
App.Run();
You can specify custom serialization
using QuixStreams.Streaming;
using QuixStreams.Streaming.Models;
// Create a client which holds generic details for creating input and output topics
var client = new QuixStreamingClient();
var inputTopicName = Environment.GetEnvironmentVariable("input");
var outputTopicName = Environment.GetEnvironmentVariable("output");
TimeseriesDataTimestamp Deserialise(payload: object)
{
return TimeseriesDataTimestamp(.....)
}
using var producer = client.GetTopicProducer(outputTopicName);
using var consumer = client.GetTopicConsumer(inputTopicName, Deserialise);
async Task FilterLowSpeed(stream: Stream)
{
Console.WriteLine("New car: " + stream.StreamKey);
async for (row in stream.Records)
{
var speed = row["GPS.Speed"].Value<double>;
var unit = row["GPS.SpeedUnit"].Value<string>;
row["GPS.Speed"] = unit == "kmh" ? speed / 1.6 : speed;
row["GPS.SpeedUnit"] = "MPH";
row["Speeding"] = True ? speed > 50 : False;
yield return row;
}
}
consumer
.GroupBy("car_id")
.FilterStream(FilterLowSpeed)
.sink(producer)
App.Run();