Imagine we have the following JSON message coming in with the following format:

Input topic

Message Key: “vehicle-mclaren-mp412c”

Message Value:

{
	"GPS": [{
			"Speed": 89.3,
			"SpeedUnit": "KMH",
		  "LAT": 45.2423434,
			"LONG" : 12.12121	
	}],
  "Driver": "Alonso"
}

Output topic

We want to achieve following:

{
	"GPS" : {
			"Speed": 50.4,
			"SpeedUnit" : "MPH",
		  "LAT": 45.2423434,
			"LONG" : 12.12121
	},
  "Driver": "Alonso",
  "Speeding" : True
}
using QuixStreams.Streaming;
using QuixStreams.Streaming.Models;

// Create a client which holds generic details for creating input and output topics
var client = new QuixStreamingClient();

var inputTopicName = Environment.GetEnvironmentVariable("input");
var outputTopicName = Environment.GetEnvironmentVariable("output");

using var producer = client.GetTopicProducer(outputTopicName);
using var consumer = client.GetTopicConsumer(inputTopicName);

async Task FilterLowSpeed(stream: Stream)
{
		Console.WriteLine("New car: " + stream.StreamKey);

		async for (row in stream.Timeseries)
		{
		   var speed = row["GPS.Speed"].Value<double>;
			 var unit = row["GPS.SpeedUnit"].Value<string>;

			 row["GPS.Speed"] = unit == "kmh" ? speed / 1.6 : speed;
			 row["GPS.SpeedUnit"] = "MPH";
			 row["Speeding"] = True ? speed > 50 : False;
			 yield return row;
		}
}

consumer
  .GroupBy("car_id")
	.FilterStream(FilterLowSpeed) 
	.sink(producer)

App.Run();

Custom serialisation

You can specify custom serialization

using QuixStreams.Streaming;
using QuixStreams.Streaming.Models;

// Create a client which holds generic details for creating input and output topics
var client = new QuixStreamingClient();

var inputTopicName = Environment.GetEnvironmentVariable("input");
var outputTopicName = Environment.GetEnvironmentVariable("output");

TimeseriesDataTimestamp Deserialise(payload: object)
{
		return TimeseriesDataTimestamp(.....)
}

using var producer = client.GetTopicProducer(outputTopicName);
using var consumer = client.GetTopicConsumer(inputTopicName, Deserialise);

async Task FilterLowSpeed(stream: Stream)
{
		Console.WriteLine("New car: " + stream.StreamKey);

		async for (row in stream.Records)
		{
		   var speed = row["GPS.Speed"].Value<double>;
			 var unit = row["GPS.SpeedUnit"].Value<string>;

			 row["GPS.Speed"] = unit == "kmh" ? speed / 1.6 : speed;
			 row["GPS.SpeedUnit"] = "MPH";
			 row["Speeding"] = True ? speed > 50 : False;
			 yield return row;
		}
}

consumer
  .GroupBy("car_id")
	.FilterStream(FilterLowSpeed) 
	.sink(producer)

App.Run();