Skip to content

Storage types

Bases: str

The file obect returned by the storage.

Source code in fastapi_storages/base.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class StorageFile(str):
    """
    The file obect returned by the storage.
    """

    def __new__(cls, name: str, storage: BaseStorage) -> "StorageFile":
        return str.__new__(cls, storage.get_path(name))

    def __init__(self, *, name: str, storage: BaseStorage):
        self._name = name
        self._storage = storage

    @property
    def name(self) -> str:
        """File name including extension."""

        return self._storage.get_name(self._name)

    @property
    def path(self) -> str:
        """Complete file path."""

        return self._storage.get_path(self._name)

    @property
    def size(self) -> int:
        """File size in bytes."""

        return self._storage.get_size(self._name)

    def open(self) -> BinaryIO:
        """
        Open a file handle of the file.
        """

        return self._storage.open(self._name)

    def write(self, file: BinaryIO) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        if not self._storage.OVERWRITE_EXISTING_FILES:
            self._name = self._storage.generate_new_filename(self._name)

        return self._storage.write(file=file, name=self._name)

    def delete(self) -> None:
        """
        Delete file from the storage
        """

        return self._storage.delete(self._name)

    def __str__(self) -> str:
        return self.path

name property

File name including extension.

path property

Complete file path.

size property

File size in bytes.

delete()

Delete file from the storage

Source code in fastapi_storages/base.py
78
79
80
81
82
83
def delete(self) -> None:
    """
    Delete file from the storage
    """

    return self._storage.delete(self._name)

open()

Open a file handle of the file.

Source code in fastapi_storages/base.py
61
62
63
64
65
66
def open(self) -> BinaryIO:
    """
    Open a file handle of the file.
    """

    return self._storage.open(self._name)

write(file)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/base.py
68
69
70
71
72
73
74
75
76
def write(self, file: BinaryIO) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    if not self._storage.OVERWRITE_EXISTING_FILES:
        self._name = self._storage.generate_new_filename(self._name)

    return self._storage.write(file=file, name=self._name)

Bases: StorageFile

Inherits features of StorageFile and adds image specific properties.

Source code in fastapi_storages/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class StorageImage(StorageFile):
    """
    Inherits features of `StorageFile` and adds image specific properties.
    """

    def __new__(
        cls, name: str, storage: BaseStorage, height: int, width: int
    ) -> "StorageImage":
        return str.__new__(cls, storage.get_path(name))

    def __init__(
        self, *, name: str, storage: BaseStorage, height: int, width: int
    ) -> None:
        super().__init__(name=name, storage=storage)
        self._width = width
        self._height = height

    @property
    def height(self) -> int:
        """
        Image height in pixels.
        """

        return self._height

    @property
    def width(self) -> int:
        """
        Image width in pixels.
        """

        return self._width

height property

Image height in pixels.

width property

Image width in pixels.

Bases: BaseStorage

File system storage which stores files in the local filesystem. You might want to use this with the FileType type.

Source code in fastapi_storages/filesystem.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class FileSystemStorage(BaseStorage):
    """
    File system storage which stores files in the local filesystem.
    You might want to use this with the `FileType` type.
    """

    default_chunk_size = 64 * 1024

    def __init__(self, path: str) -> None:
        self._path = Path(path)
        self._path.mkdir(parents=True, exist_ok=True)

    def get_name(self, name: str) -> str:
        """
        Get the normalized name of the file.
        """

        return secure_filename(Path(name).name)

    def get_path(self, name: str) -> str:
        """
        Get full path to the file.
        """

        return str(self._path / Path(name))

    def get_size(self, name: str) -> int:
        """
        Get file size in bytes.
        """

        return (self._path / name).stat().st_size

    def open(self, name: str) -> BinaryIO:
        """
        Open a file handle of the file object in binary mode.
        """

        path = self.get_path(name)
        return open(path, "rb")

    def write(self, file: BinaryIO, name: str) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        filename = self.get_name(name)
        path = self.get_path(filename)

        file.seek(0, 0)
        with open(path, "wb") as output:
            while True:
                chunk = file.read(self.default_chunk_size)
                if not chunk:
                    break
                output.write(chunk)

        return str(path)

    def delete(self, name: str) -> None:
        """
        Delete the file from the filesystem.
        """

        Path(self.get_path(name)).unlink()

    def generate_new_filename(self, filename: str) -> str:
        counter = 0
        path = self._path / filename
        stem, extension = Path(filename).stem, Path(filename).suffix

        while path.exists():
            counter += 1
            path = self._path / f"{stem}_{counter}{extension}"

        return path.name

delete(name)

Delete the file from the filesystem.

Source code in fastapi_storages/filesystem.py
67
68
69
70
71
72
def delete(self, name: str) -> None:
    """
    Delete the file from the filesystem.
    """

    Path(self.get_path(name)).unlink()

get_name(name)

Get the normalized name of the file.

Source code in fastapi_storages/filesystem.py
20
21
22
23
24
25
def get_name(self, name: str) -> str:
    """
    Get the normalized name of the file.
    """

    return secure_filename(Path(name).name)

get_path(name)

Get full path to the file.

Source code in fastapi_storages/filesystem.py
27
28
29
30
31
32
def get_path(self, name: str) -> str:
    """
    Get full path to the file.
    """

    return str(self._path / Path(name))

get_size(name)

Get file size in bytes.

Source code in fastapi_storages/filesystem.py
34
35
36
37
38
39
def get_size(self, name: str) -> int:
    """
    Get file size in bytes.
    """

    return (self._path / name).stat().st_size

open(name)

Open a file handle of the file object in binary mode.

Source code in fastapi_storages/filesystem.py
41
42
43
44
45
46
47
def open(self, name: str) -> BinaryIO:
    """
    Open a file handle of the file object in binary mode.
    """

    path = self.get_path(name)
    return open(path, "rb")

write(file, name)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/filesystem.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def write(self, file: BinaryIO, name: str) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    filename = self.get_name(name)
    path = self.get_path(filename)

    file.seek(0, 0)
    with open(path, "wb") as output:
        while True:
            chunk = file.read(self.default_chunk_size)
            if not chunk:
                break
            output.write(chunk)

    return str(path)

Bases: BaseStorage

Amazon S3 or any S3 compatible storage backend. You might want to use this with the FileType type. Requires boto3 to be installed.

Source code in fastapi_storages/s3.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class S3Storage(BaseStorage):
    """
    Amazon S3 or any S3 compatible storage backend.
    You might want to use this with the `FileType` type.
    Requires `boto3` to be installed.
    """

    default_content_type = "application/octet-stream"

    AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", "")
    """AWS access key ID. Either set here or as an environment variable."""

    AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
    """AWS secret access key. Either set here or as an environment variable."""

    AWS_S3_BUCKET_NAME = ""
    """AWS S3 bucket name to use."""

    AWS_S3_ENDPOINT_URL = ""
    """AWS S3 endpoint URL."""

    AWS_S3_USE_SSL = True
    """Indicate if SSL should be used."""

    AWS_DEFAULT_ACL = ""
    """Optional ACL set on the object like `public-read`.
    By default file will be private."""

    AWS_QUERYSTRING_AUTH = False
    """Indicate if query parameter authentication should be used in URLs."""

    AWS_S3_CUSTOM_DOMAIN = ""
    """Custom domain to use for serving object URLs."""

    def __init__(self) -> None:
        assert boto3 is not None, "'boto3' is not installed"
        assert not self.AWS_S3_ENDPOINT_URL.startswith("http"), (
            "URL should not contain protocol"
        )

        self._http_scheme = "https" if self.AWS_S3_USE_SSL else "http"
        self._url = f"{self._http_scheme}://{self.AWS_S3_ENDPOINT_URL}"
        self._s3 = boto3.client(
            "s3",
            endpoint_url=self._url,
            use_ssl=self.AWS_S3_USE_SSL,
            aws_access_key_id=self.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
        )

    def get_name(self, name: str) -> str:
        """
        Get the normalized name of the file.
        """

        filename = secure_filename(Path(name).name)
        return str(Path(name).with_name(filename))

    def get_path(self, name: str) -> str:
        """
        Get full URL to the file.
        """

        key = self.get_name(name)

        if self.AWS_S3_CUSTOM_DOMAIN:
            return "{}://{}/{}".format(
                self._http_scheme,
                self.AWS_S3_CUSTOM_DOMAIN,
                key,
            )

        if self.AWS_QUERYSTRING_AUTH:
            params = {"Bucket": self.AWS_S3_BUCKET_NAME, "Key": key}
            return self._s3.generate_presigned_url("get_object", Params=params)

        return "{}://{}/{}/{}".format(
            self._http_scheme,
            self.AWS_S3_ENDPOINT_URL,
            self.AWS_S3_BUCKET_NAME,
            key,
        )

    def get_size(self, name: str) -> int:
        """
        Get file size in bytes.
        """

        key = self.get_name(name)
        return self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)[
            "ContentLength"
        ]

    def write(self, file: BinaryIO, name: str) -> str:
        """
        Write input file which is opened in binary mode to destination.
        """

        file.seek(0, 0)
        key = self.get_name(name)
        content_type, _ = mimetypes.guess_type(key)
        params = {
            "ACL": self.AWS_DEFAULT_ACL,
            "ContentType": content_type or self.default_content_type,
        }
        self._s3.upload_fileobj(file, self.AWS_S3_BUCKET_NAME, key, ExtraArgs=params)
        return key

    def delete(self, name: str) -> None:
        """
        Delete the file from S3
        """

        self._s3.delete_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=self.get_name(name))

    def generate_new_filename(self, filename: str) -> str:
        key = self.get_name(filename)
        stem = Path(filename).stem
        suffix = Path(filename).suffix
        counter = 0

        while self._check_object_exists(key):
            counter += 1
            filename = f"{stem}_{counter}{suffix}"
            key = self.get_name(filename)

        return filename

    def _check_object_exists(self, key: str) -> bool:
        try:
            self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)
        except boto3.exceptions.botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                return False

        return True

AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID', '') class-attribute instance-attribute

AWS access key ID. Either set here or as an environment variable.

AWS_DEFAULT_ACL = '' class-attribute instance-attribute

Optional ACL set on the object like public-read. By default file will be private.

AWS_QUERYSTRING_AUTH = False class-attribute instance-attribute

Indicate if query parameter authentication should be used in URLs.

AWS_S3_BUCKET_NAME = '' class-attribute instance-attribute

AWS S3 bucket name to use.

AWS_S3_CUSTOM_DOMAIN = '' class-attribute instance-attribute

Custom domain to use for serving object URLs.

AWS_S3_ENDPOINT_URL = '' class-attribute instance-attribute

AWS S3 endpoint URL.

AWS_S3_USE_SSL = True class-attribute instance-attribute

Indicate if SSL should be used.

AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY', '') class-attribute instance-attribute

AWS secret access key. Either set here or as an environment variable.

delete(name)

Delete the file from S3

Source code in fastapi_storages/s3.py
123
124
125
126
127
128
def delete(self, name: str) -> None:
    """
    Delete the file from S3
    """

    self._s3.delete_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=self.get_name(name))

get_name(name)

Get the normalized name of the file.

Source code in fastapi_storages/s3.py
65
66
67
68
69
70
71
def get_name(self, name: str) -> str:
    """
    Get the normalized name of the file.
    """

    filename = secure_filename(Path(name).name)
    return str(Path(name).with_name(filename))

get_path(name)

Get full URL to the file.

Source code in fastapi_storages/s3.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_path(self, name: str) -> str:
    """
    Get full URL to the file.
    """

    key = self.get_name(name)

    if self.AWS_S3_CUSTOM_DOMAIN:
        return "{}://{}/{}".format(
            self._http_scheme,
            self.AWS_S3_CUSTOM_DOMAIN,
            key,
        )

    if self.AWS_QUERYSTRING_AUTH:
        params = {"Bucket": self.AWS_S3_BUCKET_NAME, "Key": key}
        return self._s3.generate_presigned_url("get_object", Params=params)

    return "{}://{}/{}/{}".format(
        self._http_scheme,
        self.AWS_S3_ENDPOINT_URL,
        self.AWS_S3_BUCKET_NAME,
        key,
    )

get_size(name)

Get file size in bytes.

Source code in fastapi_storages/s3.py
 98
 99
100
101
102
103
104
105
106
def get_size(self, name: str) -> int:
    """
    Get file size in bytes.
    """

    key = self.get_name(name)
    return self._s3.head_object(Bucket=self.AWS_S3_BUCKET_NAME, Key=key)[
        "ContentLength"
    ]

write(file, name)

Write input file which is opened in binary mode to destination.

Source code in fastapi_storages/s3.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def write(self, file: BinaryIO, name: str) -> str:
    """
    Write input file which is opened in binary mode to destination.
    """

    file.seek(0, 0)
    key = self.get_name(name)
    content_type, _ = mimetypes.guess_type(key)
    params = {
        "ACL": self.AWS_DEFAULT_ACL,
        "ContentType": content_type or self.default_content_type,
    }
    self._s3.upload_fileobj(file, self.AWS_S3_BUCKET_NAME, key, ExtraArgs=params)
    return key