Ble.Framework.Storage.Core
0.0.0.1
See the version list below for details.
dotnet add package Ble.Framework.Storage.Core --version 0.0.0.1
NuGet\Install-Package Ble.Framework.Storage.Core -Version 0.0.0.1
<PackageReference Include="Ble.Framework.Storage.Core" Version="0.0.0.1" />
<PackageVersion Include="Ble.Framework.Storage.Core" Version="0.0.0.1" />
<PackageReference Include="Ble.Framework.Storage.Core" />
paket add Ble.Framework.Storage.Core --version 0.0.0.1
#r "nuget: Ble.Framework.Storage.Core, 0.0.0.1"
#:package Ble.Framework.Storage.Core@0.0.0.1
#addin nuget:?package=Ble.Framework.Storage.Core&version=0.0.0.1
#tool nuget:?package=Ble.Framework.Storage.Core&version=0.0.0.1
Ble.Framework.Storage
跨 OSS 中间件(AWS S3 / MinIO / 阿里云 OSS / 腾讯云 COS / Azure Blob)的统一对象存储抽象 + 本地文件系统实现 + 开箱即用的断点续传 helper。
一套接口,业务层一次写好;切换后端只需新增实现 + 调一个枚举。
目录
- 1. 设计目标 / 非目标
- 2. 整体架构
- 3. 核心概念
- 4. 项目结构(按文件解释)
- 5. 配置
- 6. DI 注册流
- 7. 命名与校验规则
- 8. 数据流详解
- 9. 本地实现的磁盘布局
- 10. 一致性与并发语义
- 11. 异常体系
- 12. 时间戳 / JSON 约定
- 13. 使用 Cookbook
- 14. 内置后端
- 15. 测试覆盖
- 16. 已知边界与不支持
1. 设计目标 / 非目标
目标
- 一套抽象走天下 ——
IBleObjectStore抽出 S3 / MinIO / OSS / COS / Blob 的共同语义(19 个方法),业务层切后端零改动 - 标准兼容 —— 单段 ETag = 内容 MD5;多段 ETag =
{md5}-{count}与 S3 完全兼容;分块上传族 5 个方法 1:1 对应 - 断点续传开箱即用 ——
IBleResumableUploader/IBleResumableDownloader高阶 helper:暂停 =CancellationToken,恢复 = 同CheckpointId - 业务零样板 —— DI 自动绑定默认实现;进度回调走
IProgress<T>;失败语义走类型化异常 - 实现可替换 —— 检查点存储、对象存储后端都走接口注入;业务方可换 Redis-backed checkpoint / minio backend
- 严格规范 —— 桶 / 键 / 分块号校验全部对齐 S3;时间戳一律 Unix 毫秒戳;JSON 一律 System.Text.Json
非目标
- 预签名 URL(HTTP 模型耦合,不放进抽象)
- 对象版本控制(业务自走 key 命名规约)
- 对象标签(用 UserMetadata 替代)
- 服务端加密 / 跨区域复制(落到具体后端能力)
- 高 IOPS 扩展(本地实现是 file-per-object,不适合 100K QPS 小对象)
2. 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 业务层(Service / Controller) │
│ ↓ 注入 │
├─────────────────────────────────────────────────────────────┤
│ IBleObjectStore (协议层 - 19 方法) │
│ IBleResumableUploader / IBleResumableDownloader (高阶 helper)│
├─────────────────────────────────────────────────────────────┤
│ 实现层(按 BleCoreSettings.StorageType 路由默认实例) │
│ ┌─────────┬─────────────────────────┬──────────────┐ │
│ │ Local │ S3 (含 MinIO/R2/COS…) │ AliyunOss │ │
│ └─────────┴─────────────────────────┴──────────────┘ │
└─────────────────────────────────────────────────────────────┘
两个 NuGet 包:
Ble.Framework.Storage.Core—— 抽象层 + 模型 + 异常 + 校验 + 断点续传 helper(不依赖任何具体后端)Ble.Framework.Storage.ObjectStore—— 本地文件系统实现 + DI 注册 + Furion 模块入口
未来 MinIO 等只新增 Ble.Framework.Storage.Minio 类项目,实现 IBleObjectStore 即可,业务代码零改动。
3. 核心概念
| 概念 | 含义 |
|---|---|
| Bucket | 对象的命名空间。S3 规则:3-63 lowercase 字母数字连字符,不能形如 IP |
| Key | 对象唯一标识(全 bucket 内)。UTF-8 ≤ 1024 字节,不允许 ../. 段、不允许以 / 开头 |
| ETag | 对象内容指纹。单段上传 = 内容 MD5 十六进制;多段上传 = {MD5(各分块原始MD5串接)}-{count} |
| Range | HTTP-style 字节区间。From(start) / FromTo(start,end) / Suffix(N) |
| Multipart Upload | 大文件分块上传。Initiate → UploadPart 多次 → Complete。中间状态由 uploadId 标识 |
| Composite ETag | 多段上传完成后的 ETag。S3 算法:把每个 part 的 MD5(16 字节原始)拼起来再 MD5,加 -{count} 后缀 |
| Checkpoint | 高阶 helper 的进度持久化对象。CheckpointId 是业务方稳定标识,pause/resume 用同一 Id 续上 |
| Sidecar JSON | 本地实现的元数据存储方式:内容文件保持原样,旁边 <key>.meta.json 存 ETag/Size/UserMetadata 等 |
4. 项目结构(按文件解释)
Ble.Framework.Storage.Core
Ble.Framework.Storage.Core/
├── IBleObjectStore.cs # 19 方法的统一接口(详见第 7 节)
│
├── Constants/
│ └── BleObjectStoreConstants.cs # 桶名 3-63、键 ≤1024、分块 [1,10000]、批删 ≤1000 等
│
├── Validation/
│ └── BleObjectStoreValidator.cs # ValidateBucket / ValidateKey / ValidatePartNumber / ValidateUploadId
│
├── Exceptions/
│ └── BleObjectStorageException.cs # 8 个具名异常(详见第 11 节)
│
├── IO/
│ └── BleBoundedReadStream.cs # 限长只读流;Range 下载和分块切片共用
│
├── Models/
│ ├── BleByteRange.cs # struct,Range 工厂 + Resolve(objectSize) → (offset, length)
│ ├── BleBucketInfo.cs # 列桶项
│ ├── BleObjectInfo.cs # 列对象项
│ ├── BleObjectMetadata.cs # Stat / Get 返回的对象元数据
│ ├── BleObjectIdentifier.cs # 批量操作用的 (Bucket, Key) 元组
│ ├── BleCompletedPart.cs # Complete 时提交的 (PartNumber, ETag)
│ ├── BleMultipartPart.cs # ListParts 列表项
│ ├── BleMultipartUploadInfo.cs # ListMultipartUploads 列表项
│ ├── BleRemoveObjectError.cs # 批量删除单条失败信息
│ │
│ ├── Requests/
│ │ ├── BlePutObjectRequest.cs
│ │ ├── BleGetObjectRequest.cs # 含 Range
│ │ ├── BleCopyObjectRequest.cs # 含 ReplaceMetadata 开关
│ │ ├── BleListObjectsRequest.cs # Prefix / Delimiter / MaxKeys / ContinuationToken / StartAfter
│ │ ├── BleRemoveObjectsRequest.cs # 批量删除
│ │ ├── BleInitiateMultipartUploadRequest.cs
│ │ ├── BleUploadPartRequest.cs
│ │ ├── BleCompleteMultipartUploadRequest.cs
│ │ ├── BleAbortMultipartUploadRequest.cs
│ │ ├── BleListPartsRequest.cs
│ │ └── BleListMultipartUploadsRequest.cs
│ │
│ └── Responses/
│ ├── BlePutObjectResponse.cs
│ ├── BleGetObjectResponse.cs # IAsyncDisposable + Disposable,包元数据 + Stream
│ ├── BleListObjectsResponse.cs # Contents + CommonPrefixes + IsTruncated + NextContinuationToken
│ ├── BleRemoveObjectsResponse.cs # Deleted[] + Errors[]
│ ├── BleInitiateMultipartUploadResponse.cs
│ ├── BleUploadPartResponse.cs
│ ├── BleListPartsResponse.cs
│ └── BleListMultipartUploadsResponse.cs
│
└── Resumable/ # 断点续传 helper(协议无关)
├── IBleResumableUploader.cs # 接口
├── IBleResumableDownloader.cs # 接口
│
├── Impl/
│ ├── BleResumableUploader.cs # 分块上传 + 进度持久化 + pause/resume + 服务端状态校验
│ └── BleResumableDownloader.cs # Range GET + 进度持久化 + pause/resume + ETag 校验
│
├── Models/
│ ├── BleResumableUploadCheckpoint.cs # 上传检查点(包含 UploadId、PartSize、TotalSize、CompletedParts[])
│ ├── BleResumableDownloadCheckpoint.cs # 下载检查点(包含 ETag、TotalSize、DownloadedBytes、DestinationPath)
│ └── BleResumableUploadProgress.cs # 上传/下载 Progress struct
│
├── Options/
│ └── BleResumableOptions.cs # PartSize / BufferSize / CheckpointEveryBytes
│
├── Requests/
│ ├── BleResumableUploadRequest.cs # CheckpointId / Bucket / Key / Source / Progress
│ └── BleResumableDownloadRequest.cs # CheckpointId / Bucket / Key / DestinationPath / Progress
│
└── Stores/ # 检查点持久化
├── IBleResumableUploadCheckpointStore.cs
├── IBleResumableDownloadCheckpointStore.cs
└── Impl/
├── BleInMemoryResumableUploadCheckpointStore.cs # 进程内字典;进程重启即丢
├── BleInMemoryResumableDownloadCheckpointStore.cs
├── BleFileSystemResumableUploadCheckpointStore.cs # 走 BleJsonFileUtil 写盘;进程重启可恢复
└── BleFileSystemResumableDownloadCheckpointStore.cs
Ble.Framework.Storage.ObjectStore
Ble.Framework.Storage.ObjectStore/
├── Local/
│ ├── IBleLocalObjectStore.cs # 实现专属接口(强制注入用)
│ └── Impl/
│ ├── BleLocalObjectStore.cs # IBleLocalObjectStore + IBleObjectStore 主实现
│ ├── BleLocalObjectStoreLayout.cs # 磁盘布局(路径解析 + 防越界)
│ ├── BleLocalObjectStoreDocuments.cs # 4 个 sidecar JSON 模型(对象/桶/分块上传/分块)
│ └── BleContentTypeResolver.cs # 扩展名 → MIME 推断(FrozenDictionary,48 种常见类型)
│
├── Modules/
│ └── BleObjectStoreModule.cs # ConfigureService 入口;AdminModule 通过它挂载组件
│
├── Extensions/
│ └── BleObjectStoreExtensions.cs # AddBleLocalObjectStore() + AddBleResumableTransfer()
│
└── Options/
└── BleObjectStoreSettingsOptions.cs # BleObjectStoreSettings:Local + Resumable
5. 配置
appsettings.{env}.json:
{
"BleCoreSettings": {
"StorageType": "Local"
},
"BleObjectStoreSettings": {
"Local": {
"Root": "storage",
"DefaultListMaxKeys": 1000
},
"Resumable": {
"CheckpointRoot": ".checkpoints",
"UploadPartSize": 5242880,
"DownloadBufferSize": 4194304,
"DownloadCheckpointEveryBytes": 4194304
}
}
}
| 字段 | 默认值 | 含义 |
|---|---|---|
BleCoreSettings.StorageType |
Local |
默认 IBleObjectStore 绑定到哪个后端 |
BleObjectStoreSettings.Local.Root |
"storage" |
数据根目录;相对路径基于 IHostEnvironment.ContentRootPath,绝对路径直接使用 |
BleObjectStoreSettings.Local.DefaultListMaxKeys |
1000 |
ListObjectsAsync 不传 MaxKeys 时使用 |
BleObjectStoreSettings.Resumable.CheckpointRoot |
.checkpoints |
文件系统检查点根目录(相对 ContentRootPath) |
BleObjectStoreSettings.Resumable.UploadPartSize |
5 MB |
分块上传默认分块大小 |
BleObjectStoreSettings.Resumable.DownloadBufferSize |
4 MB |
下载读写缓冲块 |
BleObjectStoreSettings.Resumable.DownloadCheckpointEveryBytes |
4 MB |
下载累计写多少字节落一次 checkpoint |
6. DI 注册流
入口
AdminModule.cs 通过 Furion 组件机制挂上:
services.AddComponent<BleObjectStoreModule.ConfigureService>();
BleObjectStoreModule.ConfigureService 内部
public void Load(IServiceCollection services, ComponentContext _)
{
services.AddConfigurableOptions<BleObjectStoreSettingsOptions>(); // 1. 选项绑定
services.AddBleLocalObjectStore(); // 2. 本地 store
services.AddBleResumableTransfer(); // 3. 续传 helper
}
AddBleLocalObjectStore
services.AddSingleton<IBleLocalObjectStore>(sp => new BleLocalObjectStore(...)); // 始终注册
if (BleCoreModule.Settings.StorageType is BleObjectStorageType.Local)
{
services.AddSingleton<IBleObjectStore>(sp => sp.GetRequiredService<IBleLocalObjectStore>());
}
注入策略:默认场景注入
IBleObjectStore(跟随配置);想强制走本地(无视配置)注入IBleLocalObjectStore。
AddBleResumableTransfer
services.TryAddSingleton<IBleResumableUploadCheckpointStore>( /* FileSystem 版 */ );
services.TryAddSingleton<IBleResumableDownloadCheckpointStore>( /* FileSystem 版 */ );
services.TryAddSingleton<IBleResumableUploader>( /* 默认 PartSize 由 Resumable.UploadPartSize 决定 */ );
services.TryAddSingleton<IBleResumableDownloader>( /* BufferSize / CheckpointEveryBytes 同样 */ );
覆盖默认实现:业务方想用 Redis-backed / DB-backed 检查点存储,自行
services.AddSingleton<IBleResumableUploadCheckpointStore, MyImpl>(),会替换TryAddSingleton的默认。
最终 DI 图
IBleObjectStore ─┐
├─→ BleLocalObjectStore (单例)
IBleLocalObjectStore ─┘
IBleResumableUploadCheckpointStore → BleFileSystemResumableUploadCheckpointStore
IBleResumableDownloadCheckpointStore → BleFileSystemResumableDownloadCheckpointStore
IBleResumableUploader → BleResumableUploader (内部组合 IBleObjectStore + UploadCheckpointStore)
IBleResumableDownloader → BleResumableDownloader (内部组合 IBleObjectStore + DownloadCheckpointStore)
7. 命名与校验规则
BleObjectStoreValidator 在每个 API 入口都强制校验:
- Bucket —— 长度 3-63;正则
^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$;不允许--连续;不能形如 IP —— 抛BleInvalidBucketNameException - Key —— UTF-8 ≤ 1024 字节;不能
/开头;不能含./..段;不能含控制字符(除\t) —— 抛BleInvalidObjectKeyException - PartNumber ——
[1, 10000]—— 抛BleInvalidPartException - UploadId —— 非空非空白 —— 抛
BleMultipartUploadNotFoundException
完整接口能力矩阵:
| 类别 | 方法 | 备注 |
|---|---|---|
| 桶 | BucketExistsAsync / CreateBucketAsync / RemoveBucketAsync / ListBucketsAsync |
RemoveBucket 仅允许空桶 |
| 对象 | PutObjectAsync / GetObjectAsync(string,string) / GetObjectAsync(BleGetObjectRequest) |
后者支持 Range |
| 对象 | StatObjectAsync / ObjectExistsAsync / RemoveObjectAsync |
|
| 对象 | RemoveObjectsAsync / CopyObjectAsync / ListObjectsAsync |
批删 ≤ 1000 条 |
| 分块上传 | InitiateMultipartUploadAsync / UploadPartAsync / ListPartsAsync |
ListParts 用于查询续传进度 |
| 分块上传 | CompleteMultipartUploadAsync / AbortMultipartUploadAsync / ListMultipartUploadsAsync |
Complete 校验 ETag + 升序 + 唯一 |
8. 数据流详解
8.1 单段 Put
业务调用 BleLocalObjectStore 磁盘
│ │ │
│ PutObjectAsync(req) │ │
├─────────────────────────────────→│ │
│ │ 1. 校验 bucket / key │
│ │ 2. 确保 bucket 目录存在 │
│ │ 3. 解析对象路径(含越界检查) │
│ │ │
│ │ 4. 写临时文件(同时算 MD5) │
│ ├─────────────────────────────────→│ {key}.tmp.{guid}
│ │ │
│ │ 5. File.Move(temp, final, overwrite) ─→ 原子重命名
│ │ │
│ │ 6. 写 sidecar JSON │
│ ├─────────────────────────────────→│ .bleobjectstore/{bucket}/{key}.meta.json
│ │ (etag/size/contentType/lastModified/userMetadata)
│ │ │
│ ←── BlePutObjectResponse ────────│ │
关键点:内容文件 + sidecar 都走 "temp + atomic rename",同卷下读侧永不会观测到半写状态。MD5 流式计算(IncrementalHash),不缓冲全量数据。
8.2 Range Get(断点下载基础)
业务调用 BleLocalObjectStore 磁盘
│ │ │
│ GetObjectAsync(BleGetObjectRequest{ Range = From(1024) }) │
├─────────────────────────────────→│ │
│ │ 1. StatObjectAsync 拿 size │
│ ├─────────────────────────────────→│ 读 sidecar
│ │ 2. range.Resolve(size) → (1024, length)
│ │ 完全越界 → BleInvalidRangeException
│ │ 3. 打开 FileStream + Seek(1024) │
│ ├─────────────────────────────────→│
│ │ 4. 用 BleBoundedReadStream 包装 │
│ │ 限长 length 字节 │
│ ←── BleGetObjectResponse ────────│ │
│ { Metadata, Content (stream) } │
│ │ │
│ Content.CopyToAsync(httpRespBody) │
关键点:返回的 Metadata.Size 是对象总大小(不是 Range 长度);Content 是限长流,读到 length 字节后返回 0。BleBoundedReadStream 同时管理内层 FileStream 的释放(Dispose/DisposeAsync)。
8.3 Multipart Upload(协议层断点上传)
业务 IBleObjectStore 磁盘工作区
│ │ │
│ Initiate(bucket, key) │ │
├─────────────────────────→│ 生成 uploadId = guid │
│ │ 创建 .uploads/{uploadId}/ │
│ │ 写 manifest.json ├─→ {bucket} initiated, contentType, userMetadata
│ ←── { uploadId } ────────│ │
│ │ │
│ UploadPart(part #1, data)│ │
├─────────────────────────→│ 写 00001.part (流式 MD5) │
│ │ 写 00001.part.meta ├─→ etag/size/lastModified
│ ←── { etag1, size1 } ────│ │
│ ... (并发 / 任意顺序) │ │
│ UploadPart(part #N, data)│ │
│ │ │
│ Complete([(1,e1),...,(N,eN)]) │
├─────────────────────────→│ 校验: │
│ │ - 升序 + 不重复 │
│ │ - 每段服务端有且 ETag 匹配 │
│ │ 拼接所有 part 到 temp │
│ │ 算 composite ETag = MD5(原始MD5串接) + "-" + N
│ │ Move(temp, final) │
│ │ 写 sidecar │
│ │ 删 .uploads/{uploadId}/ │
│ ←── BlePutObjectResponse │ │
协议层 pause/resume = 客户端不主动调下一段:服务端的 .uploads/{uploadId}/ 目录原封不动等着;恢复时调 ListPartsAsync 看哪些段已上完,跳过即可。这就是断点上传的本质——协议状态是持久的,pause 是 client-side 概念。
8.4 高阶 Resumable Uploader 流程
IBleResumableUploader 把 8.3 + checkpoint 持久化 + 暂停/恢复包装成"调一次 UploadAsync 就完事"的 helper:
UploadAsync(req, ct):
┌─ 1. 加载 checkpoint(req.CheckpointId)
│ │
│ ├─ 没有 → goto 全新初始化
│ │
│ ├─ 有 → 校验 (Bucket/Key/TotalSize/PartSize)
│ │ 不匹配 → Abort 老任务 + 删 checkpoint → 全新初始化
│ │
│ └─ 校验通过 → ListPartsAsync
│ ├─ 服务端任务消失 → 删 checkpoint → 全新初始化
│ └─ 调和:只保留 server / client ETag 一致的段
│
├─ 2. 全新初始化(如需要)
│ │
│ └─ InitiateMultipartUploadAsync → 写 checkpoint
│
├─ 3. 顺序遍历 part 1..N:
│ │
│ └─ if (该段已在 checkpoint.CompletedParts) skip
│ else
│ seek source 到 (partNumber-1)*PartSize
│ 用 BleBoundedReadStream 切出 PartSize 字节
│ UploadPartAsync → 把 (PartNumber, ETag) 写回 checkpoint
│ report progress
│
├─ 4. CompleteMultipartUploadAsync(提交 checkpoint.CompletedParts)
│
└─ 5. 删除 checkpoint
中途 ct 取消 → ThrowIfCancellationRequested → checkpoint 已有最新已完成段 → 业务方下次同 CheckpointId 续上
8.5 高阶 Resumable Downloader 流程
DownloadAsync(req, ct):
┌─ 1. StatObjectAsync 拿 remote (ETag, Size)
│
├─ 2. 加载 checkpoint(req.CheckpointId)
│ 判断是否能续传:
│ - Bucket/Key/DestinationPath/ETag/TotalSize 全部匹配
│ - DestinationPath 文件存在 + 长度 ≥ checkpoint.DownloadedBytes
│ 任一不满足 → 截断目标文件、checkpoint = 0、写新 checkpoint
│
├─ 3. 打开目标文件 FileMode.Open + Seek + SetLength(checkpoint.DownloadedBytes)
│ (砍掉断点之后可能的脏数据)
│
├─ 4. GetObjectAsync(Range = From(checkpoint.DownloadedBytes))
│ 二次校验:response.Metadata.ETag 仍 = checkpoint.ETag
│ (断点检查到 GetObject 之间对象可能被替换)
│
├─ 5. 边读边写 + 累计字节,达到 CheckpointEveryBytes 落一次盘
│
├─ 6. 完成 → 删 checkpoint
│
└─ ct 取消 → flush + 落 checkpoint + 上抛
8.6 批量删除
部分失败模式:每条独立 try/catch,错误聚合到 Errors[],成功键聚合到 Deleted[]。
RemoveObjectsAsync({ Keys: [k1, k2, .., kN] }):
if N > 1000 → throw
foreach k:
try ValidateKey(k); RemoveObjectAsync(bucket, k)
success → Deleted += k
catch BleInvalidObjectKeyException → Errors += { Code="InvalidKey", Message }
catch IOException etc → Errors += { Code="IOError", Message }
return { Deleted, Errors }
9. 本地实现的磁盘布局
{Root}/
├── {bucket1}/ ← 桶目录(用户能直接 ls 查看的内容)
│ ├── path/to/file.txt ← 对象内容;key='path/to/file.txt'
│ └── another.bin
├── {bucket2}/
│ └── ...
└── .bleobjectstore/ ← 内部元数据(桶名规则禁止 '.' 开头,永不冲突)
├── {bucket1}/
│ ├── __bucket.json ← { creationDate: 1714752000000 }
│ ├── path/to/file.txt.meta.json ← { etag, size, contentType, lastModified, userMetadata }
│ ├── another.bin.meta.json
│ └── .uploads/ ← 进行中的分块上传任务工作区
│ └── {uploadId-guid-N}/
│ ├── manifest.json ← { bucket, key, contentType, initiated, userMetadata }
│ ├── 00001.part ← 分块内容
│ ├── 00001.part.meta ← { etag, size, lastModified }
│ ├── 00002.part
│ ├── 00002.part.meta
│ └── ...
└── {bucket2}/
└── ...
设计意图:
- 内容文件保持普通文件形态:运维直接
cp/cat/du/rsync不需要任何专用工具,bucket 目录就是普通目录。 - 元数据走 sidecar:删除 / 迁移可以直接
rm -rf .bleobjectstore而不破坏内容(最坏退化为只剩内容,下次 Stat 会即时计算 MD5 重建一份基本元数据)。 .bleobjectstore前缀 dot:桶名规则禁止以.开头,所以这个保留目录永不会与用户桶冲突。.uploads/{uploadId}/:每个上传任务隔离子目录,clean abort 只是rm -rf该子目录。- 写入一律 temp + atomic rename:同卷下不会读到半写入。
10. 一致性与并发语义
- 单进程内并发 Put 同 key —— 各自写自己的 temp,最后
File.Move(overwrite: true)顺序决定胜者;读侧从不观测半写文件 - 多进程并发 Put 同 key —— 同上,"最后写入获胜"。S3 也是这语义
- 同 key 并发 Put + Get —— Get 始终读到一个完整的、之前某次 Put 的快照。永远不会读到半段内容
- 同对象多个 multipart upload —— 每个 uploadId 独立工作区互不干扰。第一个 Complete 落地,后续 Complete 覆盖(最后写入获胜)
- 删除时的 Get —— Get 用
FileShare.Read打开;Windows 下 Read+Delete 共享允许,Linux 下 unlink 后已打开句柄继续可读 - 列表期间的写入 ——
EnumerateFiles是当时的快照视图,期间新建 / 删除的对象可能见也可能不见(与 S3 列表一致性弱保证一致) - Range GET 期间对象被覆盖 —— Range GET 已打开了 FileStream;继续读旧内容直到关闭。下次再 Get 才看到新内容
- Resumable Resume + 服务端清理 —— uploader 调用
ListPartsAsync抛BleMultipartUploadNotFoundException→ 自动重新 Initiate
11. 异常体系
BleObjectStorageException (基类,未具名错误兜底)
├── BleBucketNotFoundException 操作了不存在的桶
├── BleBucketAlreadyExistsException Create 时桶已存在
├── BleBucketNotEmptyException Remove 非空桶
├── BleObjectNotFoundException Get / Stat 不存在的对象
├── BleInvalidBucketNameException 桶名违反 S3 规则
├── BleInvalidObjectKeyException 对象键违反规则(含路径逃逸)
├── BleInvalidRangeException Range 完全落在对象之外(HTTP 416 等价)
├── BleMultipartUploadNotFoundException uploadId 不存在 / 已完成 / 已取消
└── BleInvalidPartException 分块号越界 / 重复 / ETag 不匹配 / 缺失
业务层处理建议:
BleBucketNotFound/BleObjectNotFound—— 404 —— 4xx 返给前端BleInvalidBucketName/BleInvalidObjectKey—— 400 —— 校验失败,前端拦BleInvalidRange—— 416 —— 客户端 Range 越界BleBucketAlreadyExists/BleBucketNotEmpty—— 409 —— 状态冲突BleInvalidPart/BleMultipartUploadNotFound—— 409 / 410 —— 客户端 / 服务端状态不一致BleObjectStorageException(基类)—— 500 —— IO / 配置问题,retry
12. 时间戳 / JSON 约定
- 时间戳:所有 model 的
LastModified/CreationDate/Initiated都是longUnix 毫秒戳。生成走BleClockUtil.UnixMs,需要展示再BleClockUtil.FromUnixMs(ts)还原。跨语言、跨序列化、对比都不会有时区或格式歧义。 - JSON:项目统一 System.Text.Json,所有调用走
BleJsonExtensions;文件级原子读写走BleJsonFileUtil。Storage 内部不裸调JsonSerializer.X。 - long ↔ string:
BleLongJsonConverter默认注册到BleJsonExtensions.Default,避免 JS 客户端 number 精度丢失(雪花 Id、Unix ms 都安全)。
13. 使用 Cookbook
13.1 注入默认实现(最常见)
public class FileService(IBleObjectStore store)
{
public async Task<string> SaveAsync(string bucket, string key, Stream content)
{
var resp = await store.PutObjectAsync(new BlePutObjectRequest
{
Bucket = bucket,
Key = key,
Data = content
});
return resp.ETag;
}
}
13.2 强制本地实现(与默认配置无关)
public class LogArchiveService(IBleLocalObjectStore localStore)
{
public Task ArchiveAsync(...) => localStore.PutObjectAsync(...);
}
13.3 桶操作
if (!await store.BucketExistsAsync("uploads"))
await store.CreateBucketAsync("uploads");
IReadOnlyList<BleBucketInfo> buckets = await store.ListBucketsAsync();
13.4 上传 + 用户元数据
await store.PutObjectAsync(new BlePutObjectRequest
{
Bucket = "uploads",
Key = "user/42/avatar.png",
Data = fileStream,
ContentType = "image/png", // 不传则按扩展名推断
UserMetadata = new Dictionary<string, string>
{
["uploaded-by"] = userId.ToString(),
["origin"] = "web"
}
});
13.5 Range 下载(HTTP Range 转发)
var range = BleByteRange.FromTo(1000, 1999); // 客户端 Range: bytes=1000-1999
await using var resp = await store.GetObjectAsync(new BleGetObjectRequest
{
Bucket = "videos",
Key = "movie.mp4",
Range = range
});
httpResponse.Headers["Content-Length"] = "1000";
httpResponse.Headers["Content-Range"] = $"bytes 1000-1999/{resp.Metadata.Size}";
await resp.Content.CopyToAsync(httpResponse.Body);
工厂方法:
BleByteRange.From(start)——[start, EOF]BleByteRange.FromTo(start, end)—— 闭区间[start, end]BleByteRange.Suffix(N)—— 末尾 N 字节
13.6 高阶 Resumable Upload(业务方推荐用法)
public class UploadController(IBleResumableUploader uploader)
{
[HttpPost]
public Task<BlePutObjectResponse> Upload(IFormFile file, string ckId, CancellationToken pauseToken)
{
return uploader.UploadAsync(new BleResumableUploadRequest
{
CheckpointId = ckId, // 业务方稳定 Id(用户 Id + 文件名等)
Bucket = "uploads",
Key = $"user/{userId}/{file.FileName}",
Source = file.OpenReadStream(),
ContentType = file.ContentType,
Progress = new Progress<BleResumableUploadProgress>(p =>
hub.Clients.User(userId).SendAsync("upload-progress", p))
}, pauseToken);
}
}
- 暂停:取消
pauseToken。已完成的分块和检查点都保留。 - 恢复:用同一
CheckpointId再调一次UploadAsync。 - 放弃:
uploader.AbortAsync(ckId),同时清服务端分块状态 + 本地检查点。
13.7 高阶 Resumable Download
public class DownloadJob(IBleResumableDownloader downloader)
{
public Task FetchAsync(string ckId, CancellationToken pauseToken)
=> downloader.DownloadAsync(new BleResumableDownloadRequest
{
CheckpointId = ckId,
Bucket = "exports",
Key = "2025-Q1.zip",
DestinationPath = "/var/data/exports/2025-Q1.zip",
Progress = new Progress<BleResumableDownloadProgress>(p =>
Console.WriteLine($"{p.DownloadedBytes}/{p.TotalBytes}"))
}, pauseToken);
}
13.8 协议层 Multipart(自己控分片节奏)
var init = await store.InitiateMultipartUploadAsync(new BleInitiateMultipartUploadRequest
{
Bucket = "uploads",
Key = "datasets/2025-Q1.csv"
});
var partETags = new List<BleCompletedPart>();
foreach (var (n, slice) in EnumerateSlices(file, partSize: 5 * 1024 * 1024))
{
var r = await store.UploadPartAsync(new BleUploadPartRequest
{
Bucket = "uploads", Key = "datasets/2025-Q1.csv",
UploadId = init.UploadId, PartNumber = n, Data = slice
});
partETags.Add(new BleCompletedPart(n, r.ETag));
}
await store.CompleteMultipartUploadAsync(new BleCompleteMultipartUploadRequest
{
Bucket = "uploads", Key = "datasets/2025-Q1.csv",
UploadId = init.UploadId, Parts = partETags
});
断网后自己续(不用高阶 helper 时):
var listed = await store.ListPartsAsync(new BleListPartsRequest
{
Bucket = "uploads", Key = key, UploadId = savedUploadId
});
var done = listed.Parts.ToDictionary(p => p.PartNumber, p => p.ETag);
foreach (var (n, slice) in EnumerateSlices(file, partSize: 5 * 1024 * 1024))
{
if (done.ContainsKey(n)) continue; // 跳过已上完的
await store.UploadPartAsync(...);
}
await store.CompleteMultipartUploadAsync(...);
13.9 周期巡检僵尸上传任务
var stale = await store.ListMultipartUploadsAsync(new BleListMultipartUploadsRequest { Bucket = "uploads" });
long now = BleClockUtil.UnixMs;
long sevenDaysMs = TimeSpan.FromDays(7).Ticks / TimeSpan.TicksPerMillisecond;
foreach (var u in stale.Uploads.Where(u => now - u.Initiated > sevenDaysMs))
{
await store.AbortMultipartUploadAsync(new BleAbortMultipartUploadRequest
{
Bucket = u.Bucket, Key = u.Key, UploadId = u.UploadId
});
}
13.10 批量删除
var r = await store.RemoveObjectsAsync(new BleRemoveObjectsRequest
{
Bucket = "uploads",
Keys = new[] { "tmp/a.bin", "tmp/b.bin", "tmp/c.bin" }
});
logger.LogInformation("deleted {Ok}, failed {Fail}", r.Deleted.Count, r.Errors.Count);
foreach (var err in r.Errors)
logger.LogWarning("delete {Key} failed: {Code} {Message}", err.Key, err.Code, err.Message);
13.11 列举:分页 / 前缀 / 树状
// 翻页
BleListObjectsResponse page = await store.ListObjectsAsync(new BleListObjectsRequest { Bucket = "b", MaxKeys = 100 });
while (page.IsTruncated)
{
page = await store.ListObjectsAsync(new BleListObjectsRequest
{
Bucket = "b",
MaxKeys = 100,
ContinuationToken = page.NextContinuationToken
});
}
// 树状(前缀 + 分隔符)
var tree = await store.ListObjectsAsync(new BleListObjectsRequest
{
Bucket = "b",
Prefix = "logs/",
Delimiter = "/"
});
// tree.Contents → logs/ 下的"叶子"对象
// tree.CommonPrefixes → 形如 "logs/2025/" 的"子目录"
13.12 自定义 checkpoint 持久化(Redis-backed)
public class RedisUploadCheckpointStore(IConnectionMultiplexer redis) : IBleResumableUploadCheckpointStore
{
public async Task SaveAsync(BleResumableUploadCheckpoint cp, CancellationToken ct = default)
{
var db = redis.GetDatabase();
await db.StringSetAsync($"upload-ck:{cp.CheckpointId}", cp.ToJson(), TimeSpan.FromDays(30));
}
public async Task<BleResumableUploadCheckpoint?> LoadAsync(string id, CancellationToken ct = default)
{
var db = redis.GetDatabase();
var json = await db.StringGetAsync($"upload-ck:{id}");
return json.IsNullOrEmpty ? null : json.ToString().Parse<BleResumableUploadCheckpoint>();
}
public Task DeleteAsync(string id, CancellationToken ct = default)
=> redis.GetDatabase().KeyDeleteAsync($"upload-ck:{id}");
}
// 在 Module 之后覆盖默认
services.AddSingleton<IBleResumableUploadCheckpointStore, RedisUploadCheckpointStore>();
TryAddSingleton 让默认 FileSystem 版只在没有显式注册时生效,所以业务方覆盖即可。
14. 内置后端
四个后端开箱即用,业务方代码零改动,切换只需改 BleCoreSettings.StorageType + 挂相应的 ConfigureService。
Local—— 本地文件系统 ——Ble.Framework.Storage.ObjectStore—— 直写磁盘 + sidecar JSONS3—— AWS S3 / 任意 S3 兼容(含 MinIO / R2 / B2 / 腾讯 COS / DigitalOcean Spaces)——Ble.Framework.Storage.S3—— AWSSDK.S3;通过ServiceUrl+ForcePathStyle切换具体后端AliyunOss—— 阿里云 OSS ——Ble.Framework.Storage.AliyunOss—— Aliyun.OSS.SDK.NetCore,同步 API 统一Task.Run包装
14.1 本地(Local)
// AdminModule.cs
services.AddComponent<BleObjectStoreModule.ConfigureService>();
{
"BleCoreSettings": { "StorageType": "Local" },
"BleObjectStoreSettings": { "Local": { "Root": "storage" } }
}
14.2 AWS S3 / 任意 S3 兼容
services.AddComponent<BleS3ObjectStoreModule.ConfigureService>();
appsettings:
{
"BleCoreSettings": { "StorageType": "S3" },
"BleS3ObjectStoreSettings": {
"AccessKey": "AKIA...",
"SecretKey": "secret...",
"Region": "us-east-1"
}
}
接其它 S3 兼容服务(保持 StorageType: "S3"),改 ServiceUrl + 必要时 ForcePathStyle:
- AWS S3 —— ServiceUrl 留空(仅设
Region) ——ForcePathStyle = false - Cloudflare R2 ——
https://{account}.r2.cloudflarestorage.com——ForcePathStyle = false - Backblaze B2 ——
https://s3.{region}.backblazeb2.com——ForcePathStyle = false - 腾讯云 COS ——
https://cos.{region}.myqcloud.com——ForcePathStyle = false - DigitalOcean Spaces ——
https://{region}.digitaloceanspaces.com——ForcePathStyle = false - MinIO ——
http://minio.example.com:9000——ForcePathStyle = true
业务方注入:IBleObjectStore(默认)或 IBleS3ObjectStore(强制 S3 兼容后端)。
14.3 MinIO(走 S3 兼容路径)
MinIO 服务端是 100% AWS S3 协议兼容,所以统一走 Storage.S3,无需独立实现。
services.AddComponent<BleS3ObjectStoreModule.ConfigureService>();
appsettings:
{
"BleCoreSettings": { "StorageType": "S3" },
"BleS3ObjectStoreSettings": {
"ServiceUrl": "http://minio.example.com:9000",
"ForcePathStyle": true,
"AccessKey": "minioadmin",
"SecretKey": "minioadmin"
}
}
之前曾有独立
Storage.Minio项目,但它本质就是Storage.S3 + ForcePathStyle=true,纯重复代码已删除。MinIO 用户改用上面的配置即可。
14.4 阿里云 OSS
services.AddComponent<BleAliyunOssObjectStoreModule.ConfigureService>();
appsettings:
{
"BleCoreSettings": { "StorageType": "AliyunOss" },
"BleAliyunOssObjectStoreSettings": {
"Endpoint": "https://oss-cn-hangzhou.aliyuncs.com",
"AccessKeyId": "LTAI...",
"AccessKeySecret": "secret..."
}
}
业务方注入:IBleObjectStore 或 IBleAliyunOssObjectStore。
阿里云 OSS SDK 是同步 API,实现上统一用
Task.Run包装到Task<T>;OssException.ErrorCode映射到统一异常族(NoSuchBucket/NoSuchKey/NoSuchUpload/ 等)。
14.5 接入新后端(自定义实现)
如果想接入 Azure Blob、Qiniu、华为 OBS 等其它后端,按以下 4 步:
步骤 1:在 BleObjectStorageType 加枚举值
[Description("Azure Blob")] Azure = 5,
步骤 2:新建项目 Ble.Framework.Storage.Azure,结构对齐其它后端:
Ble.Framework.Storage.Azure/
├── Ble.Framework.Storage.Azure.csproj # 引用 Storage.Core + Azure SDK
├── Azure/
│ ├── IBleAzureObjectStore.cs # 标记接口(强制注入用)
│ └── Impl/
│ ├── BleAzureObjectStore.cs # 19 个方法翻译成 Azure SDK
│ └── BleAzureExceptionMapper.cs
├── Modules/BleAzureObjectStoreModule.cs
├── Extensions/BleAzureObjectStoreExtensions.cs
└── Options/BleAzureObjectStoreSettingsOptions.cs
步骤 3:实现 IBleObjectStore 的 19 个方法,翻译成对应 SDK 调用 + 异常映射。可参考 BleAliyunOssObjectStore 作为模板。
步骤 4:注册扩展 + AdminModule 挂载 + appsettings 配置。
业务代码零改动;IBleResumableUploader / IBleResumableDownloader 自动复用新后端(它们只调 IBleObjectStore 标准方法)。
15. 测试覆盖
dotnet test tests/Ble.Framework.Storage.Tests/Ble.Framework.Storage.Tests.csproj
116 个用例,分九组:
BleObjectStoreValidatorTests—— 25 —— 桶 / 键 / 分块号 / uploadId 全部边界BleByteRangeTests—— 10 —— 三种工厂(From / FromTo / Suffix)+ Resolve 越界与裁剪BleLocalObjectStoreBucketTests—— 7 —— exists / create / remove / list + 排除内部 .bleobjectstoreBleLocalObjectStoreObjectTests—— 13 —— put / get / stat / exists / remove + ETag / 元数据 / 嵌套路径 / 路径逃逸 / 空目录回收BleLocalObjectStoreListTests—— 6 —— prefix / delimiter 分组 / 翻页 / StartAfter / delimiter+pagination 跨页跳过BleLocalObjectStoreCopyTests—— 3 —— 默认沿用源元数据 + ReplaceMetadata + 源缺失BleLocalObjectStoreRangeTests—— 7 —— 三种 Range + 末端裁剪 + EOF 越界 + 真实"断网恢复"对照BleLocalObjectStoreMultipartTests—— 11 —— initiate / 上传乱序后 list 排序 / complete 拼接 + 复合 ETag / ETag 不匹配 / 升序+去重 / 空 parts / abort / list uploads / 真实"上传断点续传"对照BleLocalObjectStoreBatchTests—— 4 —— 混合存在/不存在键 / invalid key 单条不阻断 / 超过 1000 条上限拒绝BleResumableUploaderTests—— 8 —— 全量 / 暂停恢复 / 服务端任务消失自动重启 / 检查点不匹配自动重启 / 0 字节 / 不可 seek / Abort / 文件系统 store 进程重启恢复BleResumableDownloaderTests—— 6 —— 全量 / 暂停恢复 / ETag 变化重头开始 / 远端缺失 / Abort 删文件 / 文件系统 store 进程重启恢复
16. 已知边界与不支持
| 项 | 现状 |
|---|---|
| 预签名 URL | 不在 IBleObjectStore 中(与具体后端 HTTP 模型耦合);后续走单独 capability 接口 |
| 对象版本控制 | 不暴露;业务方需要时自行用 key 命名规约(key/v{N}) |
| 对象标签 (tagging) | 暂不暴露;用 UserMetadata 替代 |
| 跨桶服务端复制 | 已支持(CopyObjectAsync) |
| 对象生命周期 | 不内置;调度由业务侧 worker 负责 |
| 加密 | 本地实现不加密;上层可在 Stream 注入加密层 |
| 高 IOPS 小对象 | 本地实现是 file-per-object,不适合 100K QPS 小对象场景;走专用 KV/中间件 |
| 并发分块上传 | 当前 BleResumableUploader 是顺序的;业务有并发需求时建议直接调协议层 API |
附:相关常量速查
BleObjectStoreConstants:
DefaultContentType = "application/octet-stream"
MinBucketNameLength = 3
MaxBucketNameLength = 63
MaxObjectKeyBytes = 1024
DefaultListMaxKeys = 1000
MaxListMaxKeys = 1000
MinPartNumber = 1
MaxPartNumber = 10000
MaxBatchDeleteKeys = 1000
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Ble.Framework.Core (>= 0.0.0.1)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on Ble.Framework.Storage.Core:
| Package | Downloads |
|---|---|
|
Ble.Framework.Storage.ObjectStore
Ble Framework |
|
|
Ble.Framework.Storage.S3
Ble Framework |
|
|
Ble.Framework.Storage.AliyunOss
Ble Framework |
GitHub repositories
This package is not used by any popular GitHub repositories.