Skip to content

File Source

Info

This is a Community connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.

This source enables reading from a localized file source, such as a JSONlines or Parquet file. It also supports file (de)compression.

The resulting messages can be produced in "replay" mode, where the time between record producing is matched as close as possible to the original. (per topic partition only).

The File Source connector is generally intended to be used alongside the related File Sink (in terms of expected file and data formatting).

How To Use

To use a File Source, you need to create an instance of FileSource and pass it to the app.dataframe() method.

One important thing to note is that you should in general point to a single topic folder (rather than a root folder with many topics) otherwise topic partitions may not line up correctly.

For the full description of expected parameters, see the File Source API page.

from quixstreams import Application
from quixstreams.sources.community.file import FileSource

app = Application(broker_address="localhost:9092")
source = FileSource(
    filepath="/path/to/my/topic_folder",
    file_format="json",
    file_compression="gzip",
    as_replay=True
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
    app.run()

File hierarchy/structure

The File Source expects a folder structure like so:

    my_sinked_topics/
    ├── topic_a/          # topic name (use this path to File Source!)
    │   ├── 0/            # topic partition number
    │   │   ├── 0000.ext  # formatted offset files (ex: JSON)
    │   │   └── 0011.ext
    │   └── 1/
    │       ├── 0003.ext
    │       └── 0016.ext
    └── topic_b/
        └── etc...

This is the default structure generated by the File Sink.

File data format/schema

The expected data schema is largely dependent on the file format chosen.

For easiest use with the File Sink, you can follow these patterns:

  • for row-based formats (like JSON), the expected data should have records with the following fields, where value is the entirety of the message value, ideally as a JSON-deserializable item:
  • _key
  • _value
  • _timestamp

  • for columnar formats (like Parquet), they do not expect an explicit value field; instead all columns should be included individually while including _key and _timestamp:

  • _key
  • _timestamp
  • field_a
  • field_b...

etc...

Topic

The default topic will have a partition count that reflects the partition count found within the provided topic's folder structure.