1
+ //! This example shows how to create tags that expire after a certain time.
2
+ //!
3
+ //! We use a prefix so we can distinguish between expiring and normal tags, and
4
+ //! then encode the expiry date in the tag name after the prefix, in a format
5
+ //! that sorts in the same order as the expiry date.
6
+ //!
7
+ //! Then we can just use
1
8
use std:: time:: { Duration , SystemTime } ;
2
9
3
10
use chrono:: Utc ;
4
11
use futures_lite:: StreamExt ;
5
12
use iroh:: endpoint;
6
- use iroh_blobs:: store:: GcConfig ;
7
- use iroh_blobs:: { hashseq:: HashSeq , BlobFormat , HashAndFormat } ;
8
- use iroh_blobs:: Hash ;
9
-
10
- use iroh_blobs:: rpc:: client:: blobs:: MemClient as BlobsClient ;
13
+ use iroh_blobs:: {
14
+ hashseq:: HashSeq , rpc:: client:: blobs:: MemClient as BlobsClient , store:: GcConfig , BlobFormat ,
15
+ Hash , HashAndFormat , Tag ,
16
+ } ;
11
17
use tokio:: signal:: ctrl_c;
12
18
13
19
/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
14
20
///
15
21
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
16
- ///
17
22
async fn create_expiring_tag (
18
23
iroh : & BlobsClient ,
19
24
hashes : & [ Hash ] ,
@@ -40,65 +45,78 @@ async fn create_expiring_tag(
40
45
Ok ( ( ) )
41
46
}
42
47
43
- async fn delete_expired_tags ( iroh : & BlobsClient , prefix : & str ) -> anyhow:: Result < ( ) > {
44
- let mut tags = iroh . tags ( ) . list ( ) . await ?;
48
+ async fn delete_expired_tags ( blobs : & BlobsClient , prefix : & str , bulk : bool ) -> anyhow:: Result < ( ) > {
49
+ let mut tags = blobs . tags ( ) . list ( ) . await ?;
45
50
let prefix = format ! ( "{}-" , prefix) ;
46
51
let now = chrono:: Utc :: now ( ) ;
47
- let mut to_delete = Vec :: new ( ) ;
48
- while let Some ( tag) = tags. next ( ) . await {
49
- let tag = tag?. name ;
50
- if let Some ( rest) = tag. 0 . strip_prefix ( prefix. as_bytes ( ) ) {
51
- let Ok ( expiry) = std:: str:: from_utf8 ( rest) else {
52
- tracing:: warn!( "Tag {} does have non utf8 expiry" , tag) ;
53
- continue ;
54
- } ;
55
- let Ok ( expiry) = chrono:: DateTime :: parse_from_rfc3339 ( expiry) else {
56
- tracing:: warn!( "Tag {} does have invalid expiry date" , tag) ;
57
- continue ;
58
- } ;
59
- let expiry = expiry. with_timezone ( & Utc ) ;
60
- if expiry < now {
61
- to_delete. push ( tag) ;
52
+ let end = format ! (
53
+ "{}-{}" ,
54
+ prefix,
55
+ now. to_rfc3339_opts( chrono:: SecondsFormat :: Secs , true )
56
+ ) ;
57
+ if bulk {
58
+ // delete all tags with the prefix and an expiry date before now
59
+ //
60
+ // this should be very efficient, since it is just a single database operation
61
+ blobs
62
+ . tags ( )
63
+ . delete_range ( Tag :: from ( prefix. clone ( ) ) ..Tag :: from ( end) )
64
+ . await ?;
65
+ } else {
66
+ // find tags to delete one by one and then delete them
67
+ //
68
+ // this allows us to print the tags before deleting them
69
+ let mut to_delete = Vec :: new ( ) ;
70
+ while let Some ( tag) = tags. next ( ) . await {
71
+ let tag = tag?. name ;
72
+ if let Some ( rest) = tag. 0 . strip_prefix ( prefix. as_bytes ( ) ) {
73
+ let Ok ( expiry) = std:: str:: from_utf8 ( rest) else {
74
+ tracing:: warn!( "Tag {} does have non utf8 expiry" , tag) ;
75
+ continue ;
76
+ } ;
77
+ let Ok ( expiry) = chrono:: DateTime :: parse_from_rfc3339 ( expiry) else {
78
+ tracing:: warn!( "Tag {} does have invalid expiry date" , tag) ;
79
+ continue ;
80
+ } ;
81
+ let expiry = expiry. with_timezone ( & Utc ) ;
82
+ if expiry < now {
83
+ to_delete. push ( tag) ;
84
+ }
62
85
}
63
86
}
64
- }
65
- for tag in to_delete {
66
- println ! ( "Deleting expired tag {}" , tag) ;
67
- iroh . tags ( ) . delete ( tag ) . await ? ;
87
+ for tag in to_delete {
88
+ println ! ( "Deleting expired tag {}" , tag ) ;
89
+ blobs . tags ( ) . delete ( tag) . await ? ;
90
+ }
68
91
}
69
92
Ok ( ( ) )
70
93
}
71
94
72
- async fn print_tags_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
95
+ async fn info_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
96
+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
73
97
loop {
74
98
let now = chrono:: Utc :: now ( ) ;
75
99
let mut tags = blobs. tags ( ) . list ( ) . await ?;
76
- println ! ( "Tags at {}:\n " , now) ;
100
+ println ! ( "Current time: {}" , now. to_rfc3339_opts( chrono:: SecondsFormat :: Secs , true ) ) ;
101
+ println ! ( "Tags:" ) ;
77
102
while let Some ( tag) = tags. next ( ) . await {
78
103
let tag = tag?;
79
104
println ! ( " {:?}" , tag) ;
80
105
}
81
- println ! ( ) ;
82
- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
83
- }
84
- }
85
-
86
- async fn print_blobs_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
87
- loop {
88
- let now = chrono:: Utc :: now ( ) ;
89
106
let mut blobs = blobs. list ( ) . await ?;
90
- println ! ( "Blobs at {}: \n " , now ) ;
107
+ println ! ( "Blobs:" ) ;
91
108
while let Some ( info) = blobs. next ( ) . await {
92
- println ! ( " {:?}" , info?) ;
109
+ let info = info?;
110
+ println ! ( " {} {} bytes" , info. hash, info. size) ;
93
111
}
94
112
println ! ( ) ;
95
113
tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
96
114
}
97
115
}
98
116
99
- async fn delete_expired_tags_task ( blobs : BlobsClient , prefix : & str , ) -> anyhow:: Result < ( ) > {
117
+ async fn delete_expired_tags_task ( blobs : BlobsClient , prefix : & str ) -> anyhow:: Result < ( ) > {
100
118
loop {
101
- delete_expired_tags ( & blobs, prefix) . await ?;
119
+ delete_expired_tags ( & blobs, prefix, false ) . await ?;
102
120
tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
103
121
}
104
122
}
@@ -108,8 +126,7 @@ async fn main() -> anyhow::Result<()> {
108
126
tracing_subscriber:: fmt:: init ( ) ;
109
127
let endpoint = endpoint:: Endpoint :: builder ( ) . bind ( ) . await ?;
110
128
let store = iroh_blobs:: store:: fs:: Store :: load ( "blobs" ) . await ?;
111
- let blobs = iroh_blobs:: net_protocol:: Blobs :: builder ( store)
112
- . build ( & endpoint) ;
129
+ let blobs = iroh_blobs:: net_protocol:: Blobs :: builder ( store) . build ( & endpoint) ;
113
130
// enable gc with a short period
114
131
blobs. start_gc ( GcConfig {
115
132
period : Duration :: from_secs ( 1 ) ,
@@ -120,36 +137,44 @@ async fn main() -> anyhow::Result<()> {
120
137
// You can skip this if you don't want to serve the data over the network.
121
138
let router = iroh:: protocol:: Router :: builder ( endpoint)
122
139
. accept ( iroh_blobs:: ALPN , blobs. clone ( ) )
123
- . spawn ( ) . await ?;
140
+ . spawn ( )
141
+ . await ?;
124
142
125
143
// setup: add some data and tag it
126
144
{
127
145
// add several blobs and tag them with an expiry date 10 seconds in the future
128
146
let batch = blobs. client ( ) . batch ( ) . await ?;
129
147
let a = batch. add_bytes ( "blob 1" . as_bytes ( ) ) . await ?;
130
148
let b = batch. add_bytes ( "blob 2" . as_bytes ( ) ) . await ?;
131
- let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 10 ) ) . unwrap ( ) ;
132
- create_expiring_tag ( blobs. client ( ) , & [ * a. hash ( ) , * b. hash ( ) ] , "expiring" , expires_at) . await ?;
149
+ let expires_at = SystemTime :: now ( )
150
+ . checked_add ( Duration :: from_secs ( 10 ) )
151
+ . unwrap ( ) ;
152
+ create_expiring_tag (
153
+ blobs. client ( ) ,
154
+ & [ * a. hash ( ) , * b. hash ( ) ] ,
155
+ "expiring" ,
156
+ expires_at,
157
+ )
158
+ . await ?;
133
159
134
160
// add a single blob and tag it with an expiry date 60 seconds in the future
135
161
let c = batch. add_bytes ( "blob 3" . as_bytes ( ) ) . await ?;
136
- let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 60 ) ) . unwrap ( ) ;
162
+ let expires_at = SystemTime :: now ( )
163
+ . checked_add ( Duration :: from_secs ( 60 ) )
164
+ . unwrap ( ) ;
137
165
create_expiring_tag ( blobs. client ( ) , & [ * c. hash ( ) ] , "expiring" , expires_at) . await ?;
138
166
// batch goes out of scope, so data is only protected by the tags we created
139
167
}
140
168
let client = blobs. client ( ) . clone ( ) ;
141
169
142
170
// delete expired tags every 5 seconds
143
- let check_task = tokio:: spawn ( delete_expired_tags_task ( client. clone ( ) , "expiring" ) ) ;
144
- // print tags every 5 seconds
145
- let print_tags_task = tokio:: spawn ( print_tags_task ( client. clone ( ) ) ) ;
146
- // print blobs every 5 seconds
147
- let print_blobs_task = tokio:: spawn ( print_blobs_task ( client) ) ;
171
+ let delete_task = tokio:: spawn ( delete_expired_tags_task ( client. clone ( ) , "expiring" ) ) ;
172
+ // print all tags and blobs every 5 seconds
173
+ let info_task = tokio:: spawn ( info_task ( client. clone ( ) ) ) ;
148
174
149
175
ctrl_c ( ) . await ?;
176
+ delete_task. abort ( ) ;
177
+ info_task. abort ( ) ;
150
178
router. shutdown ( ) . await ?;
151
- check_task. abort ( ) ;
152
- print_tags_task. abort ( ) ;
153
- print_blobs_task. abort ( ) ;
154
179
Ok ( ( ) )
155
- }
180
+ }
0 commit comments