File Source
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This source enables reading from a localized file source, such as a JSONlines or Parquet file. It also supports file (de)compression.
The resulting messages can be produced in "replay" mode, where the time between record producing is matched as close as possible to the original. (per topic partition only).
The File Source connector is generally intended to be used alongside the related File Sink (in terms of expected file and data formatting).
How to use CSV Source
To use a CSV Source, you need to create and instance of FileSource
and pass it to the app.dataframe()
method.
One important thing to note is that you should in general point to a single topic folder (rather than a root folder with many topics) otherwise topic partitions may not line up correctly.
For the full description of expected parameters, see the File Source API page.
from quixstreams import Application
from quixstreams.sources.community.file import FileSource
app = Application(broker_address="localhost:9092")
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
as_replay=True
)
sdf = app.dataframe(source=source).print(metadata=True)
if __name__ == "__main__":
app.run()
File hierarchy/structure
The File Source expects a folder structure like so:
my_sinked_topics/
├── topic_a/ # topic name (use this path to File Source!)
│ ├── 0/ # topic partition number
│ │ ├── 0000.ext # formatted offset files (ex: JSON)
│ │ └── 0011.ext
│ └── 1/
│ ├── 0003.ext
│ └── 0016.ext
└── topic_b/
└── etc...
This is the default structure generated by the File Sink.
File data format/schema
The expected data schema is largely dependent on the file format chosen.
For easiest use with the File Sink, you can follow these patterns:
- for row-based formats (like JSON), the expected data should have records with the following fields, where value is the entirety of the message value, ideally as a JSON-deserializable item:
_key
_value
-
_timestamp
-
for columnar formats (like Parquet), they do not expect an explicit
value
field; instead all columns should be included individually while including_key
and_timestamp
: _key
_timestamp
field_a
field_b
...
etc...
Topic
The default topic will have a partition count that reflects the partition count found within the provided topic's folder structure.