Other services
Danger
This tutorial is out of date. Please check the tutorials overview for our latest tutorials.
There are some additional services in the pipeline that provide useful functionality. These range from S3 storage of data to calculation of the maximum vehicles per day in a specific location.
Briefly, these services are:
-
Stream merge - merges all the traffic cam streams into a single stream to make things easier to process in the UI.
-
Cam vehicles - calculates the total vehicles, where vehicle is defined as one of: car, 'bus', 'truck', 'motorbike'. This number is fed into the Max vehicle window service.
-
Max Vehicle Window - calculates the maximum vehicles over a time window of one day. This service sends messages to the Data API service.
-
Data buffer - this provides a one second data buffer. This helps reduce load on the Data API service.
-
Data API - this REST API service provides the following endpoints:
max_vehicles
- last known maximum vehicle count for each camera. 24 hour rolling window.detected_objects
- output from the computer vision service for all cameras. excludes imagesvehicles
- the last known vehicle count for each cameraimage
- the last image from the specified camera
This API is called by the UI to obtain useful data.
-
S3 - stores objects in Amazon Web Services (AWS) S3. This service enables you to persist any data or results you might like to keep more permanently.
Tip
If you ever need to obtain the stream ID, and it is not in the messsages available to the service, it is available through the stream object by using the stream_id
property, for example, stream_id = stream_consumer.stream_id
.
Stream merge
This service prepares data for ease of processing by the UI. Merges all streams onto a single stream. The input stream is image-processed
, the output stream is image-processed-merged
. Note the code also decodes the image and then does a Base64 encode prior to passing to the output topic. The UI uses the Quix Streaming Reader to read the messages from image-processed-merged
, including the Base64 encoded image data.
The key code:
# Callback triggered for each new parameter data.
def on_dataframe_handler(self, stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
df["TAG__parent_streamId"] = self.consumer_stream.stream_id
df['image'] = df["image"].apply(lambda x: str(base64.b64encode(x).decode('utf-8')))
self.producer_topic.get_or_create_stream("image-feed") \
.timeseries.buffer.publish(df)
Cam vehicles
This service simply adds together objects of the following types: car, bus, truck, motorbike to obtain a total number of vehicles. It classes these objects as vehicles. The message output to the next stage in the pipeline, max vehicles, is as follows:
{
"Epoch": 0,
"Timestamps": [
1694077540745375700
],
"NumericValues": {
"truck": [
1
],
"car": [
2
],
"lat": [
51.4075
],
"lon": [
-0.19236
],
"delta": [
-2.177236557006836
],
"vehicles": [
3
]
},
"StringValues": {},
"BinaryValues": {
"image": [
"(Binary of 157.97 KB)"
]
},
"TagValues": {}
}
In this example there are 2 cars, and 1 truck giving a vehicles
count of 3.
The main code is:
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# List of vehicle columns
vehicle_columns = ['car', 'bus', 'truck', 'motorbike']
# Calculate the total vehicle count based on existing columns
total_vehicle_count = df.apply(lambda row: sum(row.get(column, 0) for column in vehicle_columns), axis=1)
# Store vehicle count in the data frame
df["vehicles"] = total_vehicle_count
stream_producer = topic_producer.get_or_create_stream(stream_id = stream_consumer.stream_id)
# Publish data frame to the producer stream
stream_producer.timeseries.buffer.publish(df)
You can find out more about pandas DataFrames in the pandas documentation.
Max Vehicle Window
The max vehicles service takes the total vehicle count and finds the maximum value over a one day window. This value is made available to the Data API service. The message passed to the Data API has the following format:
{
"Epoch": 0,
"Timestamps": [
1694088514402644000
],
"NumericValues": {
"max_vehicles": [
8
]
},
"StringValues": {},
"TagValues": {
"window_start": [
"2023-09-06 12:08:12.394372"
],
"window_end": [
"2023-09-07 12:08:12.394372"
],
"window": [
"1d 0h 0m"
],
"cam": [
"JamCams_00001.08959"
]
}
}
You can see the exact time window is recorded, along with the maximum vehicle count during that time window. This provides a crude measure of the capacity of the road. This capacity can then be used by the UI to calculate a percentage of capacity. For example, if there are 8 cars on a road, and the maximum seen is 10, then the road is considered to be at 80% capacity, and this is displayed on the UI, as shown in the following screenshot:
This service uses state, as you need to save the maximum count reached during the time window.
Data buffer
This service provides a one second data buffer. This reduces load on the Data API service. There are three input topics to the service, max-vehicles
, processed-images
, and vehicle-counts
: and one output topic, buffered-data
.
Data API
The data service offloads calculations that could be done in the web client, and instead provides key data only when the UI needs it. The UI can request this data when it needs it through the REST API of the Data API service.
The Data API provides the following endpoints:
- max_vehicles
- detected_objects
- vehicles
- image
These are used by the UI to obtain and then display the data on the web interface.
Max Vehicles
Returns the maximum number of "vehicles" seen on a camera, where vehicles is one of cars, buses, trucks, or motorbikes.
For a GET
on the endpoint /max_vehicles
, the response is a dictionary item per camera:
- Key=camera name
- Value=max vehicle count
Example response JSON:
This service is implemented as a simple Flask web app hosted in Quix.
Detected Objects
Returns a dictionary of all the data for a given camera (except for the images as these are quite large to store, even temporarily).
For a GET
on the endpoint /detected_objects
, the response is an array of:
- Key=camera name
- Value=dictionary of the data
Where the data dictionary is:
- object counts (car: 3, bus: 11 etc)
- lat
- lon
- timestamp
Using this you can plot/display every camera and its count as soon as you get this data.
Example response JSON:
[
"JamCams_00001.01419": {
"car":{"0":3.0},
"delta":{"0":-1.0003459453582764},
"image":{"0":""},"lat":{"0":51.5596},
"lon":{"0":-0.07424},"person":{"0":3.0},
"timestamp":{"0":1692471825406959867},
"traffic light":{"0":1.0},
"truck":{"0":1.0}
},
...
]
Vehicles
The last known vehicle count for each camera.
See the README for more information.
Image
The last image from the specified camera.
See the README for more information.
S3
This is the standard Quix code sample AWS S3 destination connector. It takes messages on the input topic and writes them to S3. There is an optional batching facility whereby you can batch messages and then write them to S3 in a batch - this can be more efficient for higher frequency data. You can control batching based on time interval or message count.
See also
For more information refer to:
- Connectors - connectors, both source and destination.
- Quix Streams - the client library.