Skip to content

Storage types

Bases: str

The file obect returned by the storage.

Source code in fastapi_storages/base.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class StorageFile(str):
    """
    The file obect returned by the storage.
    """

    def __new__(cls, name: str, storage: BaseStorage) -> "StorageFile":
        return str.__new__(cls, storage.get_path(name))

    def __init__(self, *, name: str, storage: BaseStorage):
        self._name = name
        self._storage = storage

    @property
    def name(self) -> str:
        """File name including extension."""

        return self._storage.get_name(self._name)

    @property
    def path(self) -> str:
        """Complete file path."""

        return self._storage.get_path(self._name)

    @property
    def size(self) -> int:
        """File size in bytes."""

        return self._storage.get_size(self._name)

    def open(self) -> BinaryIO:
        """
        Open a file handle of the file.
        """

        return self._storage.open(self._name)

    def write(self, file: BinaryIO) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        if not self._storage.OVERWRITE_EXISTING_FILES:
            self._name = self._storage.generate_new_filename(self._name)

        return self._storage.write(file=file, name=self._name)

    def delete(self) -> None:
        """
        Delete file from the storage
        """

        return self._storage.delete(self._name)

    def __str__(self) -> str:
        return self.path

name property

File name including extension.

path property

Complete file path.

size property

File size in bytes.

delete()

Delete file from the storage

Source code in fastapi_storages/base.py
78
79
80
81
82
83
def delete(self) -> None:
    """
    Delete file from the storage
    """

    return self._storage.delete(self._name)

open()

Open a file handle of the file.

Source code in fastapi_storages/base.py
61
62
63
64
65
66
def open(self) -> BinaryIO:
    """
    Open a file handle of the file.
    """

    return self._storage.open(self._name)

write(file)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/base.py
68
69
70
71
72
73
74
75
76
def write(self, file: BinaryIO) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    if not self._storage.OVERWRITE_EXISTING_FILES:
        self._name = self._storage.generate_new_filename(self._name)

    return self._storage.write(file=file, name=self._name)

Bases: StorageFile

Inherits features of StorageFile and adds image specific properties.

Source code in fastapi_storages/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class StorageImage(StorageFile):
    """
    Inherits features of `StorageFile` and adds image specific properties.
    """

    def __new__(
        cls, name: str, storage: BaseStorage, height: int, width: int
    ) -> "StorageImage":
        return str.__new__(cls, storage.get_path(name))

    def __init__(
        self, *, name: str, storage: BaseStorage, height: int, width: int
    ) -> None:
        super().__init__(name=name, storage=storage)
        self._width = width
        self._height = height

    @property
    def height(self) -> int:
        """
        Image height in pixels.
        """

        return self._height

    @property
    def width(self) -> int:
        """
        Image width in pixels.
        """

        return self._width

height property

Image height in pixels.

width property

Image width in pixels.

Bases: BaseStorage

File system storage which stores files in the local filesystem. You might want to use this with the FileType type.

Source code in fastapi_storages/filesystem.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class FileSystemStorage(BaseStorage):
    """
    File system storage which stores files in the local filesystem.
    You might want to use this with the `FileType` type.
    """

    default_chunk_size = 64 * 1024

    def __init__(self, path: str) -> None:
        self._path = Path(path)
        self._path.mkdir(parents=True, exist_ok=True)

    def get_name(self, name: str) -> str:
        """
        Get the normalized name of the file.
        """

        parts = [s for p in Path(name).parts if (s := secure_filename(p))]
        return str(Path(*parts)) if parts else secure_filename(name)

    def get_path(self, name: str) -> str:
        """
        Get full path to the file.
        """

        return str(self._path / Path(name))

    def get_size(self, name: str) -> int:
        """
        Get file size in bytes.
        """

        return (self._path / name).stat().st_size

    def open(self, name: str) -> BinaryIO:
        """
        Open a file handle of the file object in binary mode.
        """

        path = self.get_path(name)
        return open(path, "rb")

    def write(self, file: BinaryIO, name: str) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        filename = self.get_name(name)
        path = self.get_path(filename)

        self._path.joinpath(filename).parent.mkdir(parents=True, exist_ok=True)
        file.seek(0, 0)
        with open(path, "wb") as output:
            while True:
                chunk = file.read(self.default_chunk_size)
                if not chunk:
                    break
                output.write(chunk)

        return str(path)

    def delete(self, name: str) -> None:
        """
        Delete the file from the filesystem.
        """

        Path(self.get_path(name)).unlink()

    def generate_new_filename(self, filename: str) -> str:
        counter = 0
        prefix = Path(filename).parent
        stem = Path(filename).stem
        extension = Path(filename).suffix
        path = self._path / filename

        while path.exists():
            counter += 1
            path = self._path / prefix / f"{stem}_{counter}{extension}"

        return str(prefix / path.name)

delete(name)

Delete the file from the filesystem.

Source code in fastapi_storages/filesystem.py
69
70
71
72
73
74
def delete(self, name: str) -> None:
    """
    Delete the file from the filesystem.
    """

    Path(self.get_path(name)).unlink()

get_name(name)

Get the normalized name of the file.

Source code in fastapi_storages/filesystem.py
20
21
22
23
24
25
26
def get_name(self, name: str) -> str:
    """
    Get the normalized name of the file.
    """

    parts = [s for p in Path(name).parts if (s := secure_filename(p))]
    return str(Path(*parts)) if parts else secure_filename(name)

get_path(name)

Get full path to the file.

Source code in fastapi_storages/filesystem.py
28
29
30
31
32
33
def get_path(self, name: str) -> str:
    """
    Get full path to the file.
    """

    return str(self._path / Path(name))

get_size(name)

Get file size in bytes.

Source code in fastapi_storages/filesystem.py
35
36
37
38
39
40
def get_size(self, name: str) -> int:
    """
    Get file size in bytes.
    """

    return (self._path / name).stat().st_size

open(name)

Open a file handle of the file object in binary mode.

Source code in fastapi_storages/filesystem.py
42
43
44
45
46
47
48
def open(self, name: str) -> BinaryIO:
    """
    Open a file handle of the file object in binary mode.
    """

    path = self.get_path(name)
    return open(path, "rb")

write(file, name)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/filesystem.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def write(self, file: BinaryIO, name: str) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    filename = self.get_name(name)
    path = self.get_path(filename)

    self._path.joinpath(filename).parent.mkdir(parents=True, exist_ok=True)
    file.seek(0, 0)
    with open(path, "wb") as output:
        while True:
            chunk = file.read(self.default_chunk_size)
            if not chunk:
                break
            output.write(chunk)

    return str(path)

Bases: BaseStorage

Amazon S3 or any S3 compatible storage backend. You might want to use this with the FileType type. Requires boto3 to be installed.

Source code in fastapi_storages/s3.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class S3Storage(BaseStorage):
    """
    Amazon S3 or any S3 compatible storage backend.
    You might want to use this with the `FileType` type.
    Requires `boto3` to be installed.
    """

    default_content_type = "application/octet-stream"

    AWS_ACCESS_KEY_ID = lookup_env("AWS_ACCESS_KEY_ID")
    """AWS access key ID. Either set here or as an environment variable."""

    AWS_SECRET_ACCESS_KEY = lookup_env("AWS_SECRET_ACCESS_KEY")
    """AWS secret access key. Either set here or as an environment variable."""

    AWS_S3_BUCKET_NAME = lookup_env("AWS_S3_BUCKET_NAME")
    """AWS S3 bucket name to use."""

    AWS_S3_ENDPOINT_URL = lookup_env("AWS_S3_ENDPOINT_URL")
    """AWS S3 endpoint URL."""

    AWS_S3_USE_SSL = lookup_env("AWS_S3_USE_SSL", True)
    """Indicate if SSL should be used."""

    AWS_DEFAULT_ACL = lookup_env("AWS_DEFAULT_ACL")
    """Optional ACL set on the object like `public-read`.
    By default file will be private."""

    AWS_QUERYSTRING_AUTH = lookup_env("AWS_QUERYSTRING_AUTH", False)
    """Indicate if query parameter authentication should be used in URLs."""

    AWS_S3_CUSTOM_DOMAIN = lookup_env("AWS_S3_CUSTOM_DOMAIN")
    """Custom domain to use for serving object URLs."""

    AWS_S3_SIGNATURE_VERSION = lookup_env("AWS_S3_SIGNATURE_VERSION")
    """Signature version for request signing, e.g. "s3v4"."""

    AWS_S3_OBJECT_PARAMETERS: dict = {}
    """Extra parameters passed to every upload.
    E.g. {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": "..."}."""

    def __init__(self) -> None:
        assert boto3 is not None, "'boto3' is not installed"
        assert Config is not None, "'boto3' is not installed"
        assert ClientError is not None, "'boto3' is not installed"

        self._http_scheme = "https" if self.AWS_S3_USE_SSL else "http"

        client_kwargs: dict = {"use_ssl": self.AWS_S3_USE_SSL}

        if self.AWS_S3_ENDPOINT_URL:
            assert not self.AWS_S3_ENDPOINT_URL.startswith("http"), (
                "URL should not contain protocol"
            )
            client_kwargs["endpoint_url"] = (
                f"{self._http_scheme}://{self.AWS_S3_ENDPOINT_URL}"
            )

        if self.AWS_ACCESS_KEY_ID:
            client_kwargs["aws_access_key_id"] = self.AWS_ACCESS_KEY_ID
        if self.AWS_SECRET_ACCESS_KEY:
            client_kwargs["aws_secret_access_key"] = self.AWS_SECRET_ACCESS_KEY

        if self.AWS_S3_SIGNATURE_VERSION:
            client_kwargs["config"] = Config(
                signature_version=self.AWS_S3_SIGNATURE_VERSION
            )

        self._s3 = boto3.client("s3", **client_kwargs)

        if self.AWS_S3_ENDPOINT_URL:
            self._url = f"{self._http_scheme}://{self.AWS_S3_ENDPOINT_URL}"
        else:
            self._url = self._s3.meta.endpoint_url

    def get_name(self, name: str) -> str:
        """
        Get the normalized name of the file.
        """

        filename = secure_filename(Path(name).name)
        return str(Path(name).with_name(filename))

    def get_path(self, name: str) -> str:
        """
        Get full URL to the file.
        """

        key = self.get_name(name)

        if self.AWS_S3_CUSTOM_DOMAIN:
            return "{}://{}/{}".format(
                self._http_scheme,
                self.AWS_S3_CUSTOM_DOMAIN,
                key,
            )

        if self.AWS_QUERYSTRING_AUTH:
            params = {"Bucket": self.AWS_S3_BUCKET_NAME, "Key": key}
            return self._s3.generate_presigned_url("get_object", Params=params)

        return "{}/{}/{}".format(
            self._url,
            self.AWS_S3_BUCKET_NAME,
            key,
        )

    def get_size(self, name: str) -> int:
        """
        Get file size in bytes.
        """

        key = self.get_name(name)
        return self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)[
            "ContentLength"
        ]

    def write(self, file: BinaryIO, name: str) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        file.seek(0, 0)
        key = self.get_name(name)
        content_type, _ = mimetypes.guess_type(key)
        params: dict = {
            "ACL": self.AWS_DEFAULT_ACL,
            "ContentType": content_type or self.default_content_type,
            **self.AWS_S3_OBJECT_PARAMETERS,
        }
        self._s3.upload_fileobj(file, self.AWS_S3_BUCKET_NAME, key, ExtraArgs=params)
        return key

    def delete(self, name: str) -> None:
        """
        Delete the file from S3
        """

        self._s3.delete_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=self.get_name(name))

    def generate_new_filename(self, filename: str) -> str:
        key = self.get_name(filename)
        stem = Path(filename).stem
        suffix = Path(filename).suffix
        counter = 0

        while self._check_object_exists(key):
            counter += 1
            filename = f"{stem}_{counter}{suffix}"
            key = self.get_name(filename)

        return filename

    def _check_object_exists(self, key: str) -> bool:
        try:
            self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)
            return True
        except ClientError as e:
            if e.response.get("Error", {}).get("Code") == "404":
                return False
            raise

AWS_ACCESS_KEY_ID = lookup_env('AWS_ACCESS_KEY_ID') class-attribute instance-attribute

AWS access key ID. Either set here or as an environment variable.

AWS_DEFAULT_ACL = lookup_env('AWS_DEFAULT_ACL') class-attribute instance-attribute

Optional ACL set on the object like public-read. By default file will be private.

AWS_QUERYSTRING_AUTH = lookup_env('AWS_QUERYSTRING_AUTH', False) class-attribute instance-attribute

Indicate if query parameter authentication should be used in URLs.

AWS_S3_BUCKET_NAME = lookup_env('AWS_S3_BUCKET_NAME') class-attribute instance-attribute

AWS S3 bucket name to use.

AWS_S3_CUSTOM_DOMAIN = lookup_env('AWS_S3_CUSTOM_DOMAIN') class-attribute instance-attribute

Custom domain to use for serving object URLs.

AWS_S3_ENDPOINT_URL = lookup_env('AWS_S3_ENDPOINT_URL') class-attribute instance-attribute

AWS S3 endpoint URL.

AWS_S3_OBJECT_PARAMETERS = {} class-attribute instance-attribute

Extra parameters passed to every upload. E.g. {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": "..."}.

AWS_S3_SIGNATURE_VERSION = lookup_env('AWS_S3_SIGNATURE_VERSION') class-attribute instance-attribute

Signature version for request signing, e.g. "s3v4".

AWS_S3_USE_SSL = lookup_env('AWS_S3_USE_SSL', True) class-attribute instance-attribute

Indicate if SSL should be used.

AWS_SECRET_ACCESS_KEY = lookup_env('AWS_SECRET_ACCESS_KEY') class-attribute instance-attribute

AWS secret access key. Either set here or as an environment variable.

delete(name)

Delete the file from S3

Source code in fastapi_storages/s3.py
151
152
153
154
155
156
def delete(self, name: str) -> None:
    """
    Delete the file from S3
    """

    self._s3.delete_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=self.get_name(name))

get_name(name)

Get the normalized name of the file.

Source code in fastapi_storages/s3.py
93
94
95
96
97
98
99
def get_name(self, name: str) -> str:
    """
    Get the normalized name of the file.
    """

    filename = secure_filename(Path(name).name)
    return str(Path(name).with_name(filename))

get_path(name)

Get full URL to the file.

Source code in fastapi_storages/s3.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_path(self, name: str) -> str:
    """
    Get full URL to the file.
    """

    key = self.get_name(name)

    if self.AWS_S3_CUSTOM_DOMAIN:
        return "{}://{}/{}".format(
            self._http_scheme,
            self.AWS_S3_CUSTOM_DOMAIN,
            key,
        )

    if self.AWS_QUERYSTRING_AUTH:
        params = {"Bucket": self.AWS_S3_BUCKET_NAME, "Key": key}
        return self._s3.generate_presigned_url("get_object", Params=params)

    return "{}/{}/{}".format(
        self._url,
        self.AWS_S3_BUCKET_NAME,
        key,
    )

get_size(name)

Get file size in bytes.

Source code in fastapi_storages/s3.py
125
126
127
128
129
130
131
132
133
def get_size(self, name: str) -> int:
    """
    Get file size in bytes.
    """

    key = self.get_name(name)
    return self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)[
        "ContentLength"
    ]

write(file, name)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/s3.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def write(self, file: BinaryIO, name: str) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    file.seek(0, 0)
    key = self.get_name(name)
    content_type, _ = mimetypes.guess_type(key)
    params: dict = {
        "ACL": self.AWS_DEFAULT_ACL,
        "ContentType": content_type or self.default_content_type,
        **self.AWS_S3_OBJECT_PARAMETERS,
    }
    self._s3.upload_fileobj(file, self.AWS_S3_BUCKET_NAME, key, ExtraArgs=params)
    return key