1 /*
2  * Copyright 2017 Google Inc.
3  *
4  * Use of this source code is governed by a BSD-style license that can
5  * be found in the LICENSE file.
6  *
7  */
8 
9 //
10 //
11 //
12 
13 #include <stdbool.h>
14 
15 #include "extent_ring.h"
16 #include "macros.h"
17 
18 //
19 //
20 //
21 
22 void
skc_extent_ring_init(struct skc_extent_ring * const ring,skc_uint const size_pow2,skc_uint const size_snap,skc_uint const size_elem)23 skc_extent_ring_init(struct skc_extent_ring * const ring,
24                      skc_uint                 const size_pow2,
25                      skc_uint                 const size_snap,
26                      skc_uint                 const size_elem)
27 {
28   ring->head      = NULL;
29   ring->last      = NULL;
30 
31   ring->outer.rw  = (skc_uint2){ 0 };
32   ring->inner.rw  = (skc_uint2){ 0 };
33 
34   // FIXME -- assert size is pow2 -- either here or statically in the config
35 
36   ring->size.pow2 = size_pow2;
37   ring->size.mask = size_pow2 - 1;
38   ring->size.snap = size_snap;
39   ring->size.elem = size_elem;
40 }
41 
42 //
43 //
44 //
45 
46 skc_uint
skc_extent_ring_rem(struct skc_extent_ring const * const ring)47 skc_extent_ring_rem(struct skc_extent_ring const * const ring)
48 {
49   return ring->size.pow2 - (ring->outer.writes - ring->outer.reads);
50 }
51 
52 skc_bool
skc_extent_ring_is_full(struct skc_extent_ring const * const ring)53 skc_extent_ring_is_full(struct skc_extent_ring const * const ring)
54 {
55   return (ring->outer.writes - ring->outer.reads) == ring->size.pow2;
56 }
57 
58 skc_uint
skc_extent_ring_wip_count(struct skc_extent_ring const * const ring)59 skc_extent_ring_wip_count(struct skc_extent_ring const * const ring)
60 {
61   return ring->outer.writes - ring->inner.reads;
62 }
63 
64 skc_uint
skc_extent_ring_wip_rem(struct skc_extent_ring const * const ring)65 skc_extent_ring_wip_rem(struct skc_extent_ring const * const ring)
66 {
67   return SKC_MIN_MACRO(skc_extent_ring_rem(ring),ring->size.snap) - skc_extent_ring_wip_count(ring);
68 }
69 
70 skc_bool
skc_extent_ring_wip_is_full(struct skc_extent_ring const * const ring)71 skc_extent_ring_wip_is_full(struct skc_extent_ring const * const ring)
72 {
73   return skc_extent_ring_wip_count(ring) == SKC_MIN_MACRO(skc_extent_ring_rem(ring),ring->size.snap);
74 }
75 
76 skc_uint
skc_extent_ring_wip_index_inc(struct skc_extent_ring * const ring)77 skc_extent_ring_wip_index_inc(struct skc_extent_ring * const ring)
78 {
79   return ring->outer.writes++ & ring->size.mask;
80 }
81 
82 //
83 //
84 //
85 
86 void
skc_extent_ring_checkpoint(struct skc_extent_ring * const ring)87 skc_extent_ring_checkpoint(struct skc_extent_ring * const ring)
88 {
89   ring->inner.writes = ring->outer.writes;
90 }
91 
92 //
93 //
94 //
95 
96 struct skc_extent_ring_snap *
skc_extent_ring_snap_alloc(struct skc_runtime * const runtime,struct skc_extent_ring * const ring)97 skc_extent_ring_snap_alloc(struct skc_runtime     * const runtime,
98                            struct skc_extent_ring * const ring)
99 {
100   skc_subbuf_id_t id;
101 
102   struct skc_extent_ring_snap * snap =
103     skc_runtime_host_temp_alloc(runtime,
104                                 SKC_MEM_FLAGS_READ_WRITE,
105                                 sizeof(*snap),&id,NULL);
106   // save the id
107   snap->id      = id;
108 
109   // back point to parent
110   snap->ring    = ring;
111   snap->next    = NULL;
112 
113   // save the inner boundaries of the ring to the snapshot
114   snap->reads   = ring->inner.reads;
115   snap->writes  = ring->inner.reads = ring->inner.writes;
116 
117   // mark not free
118   snap->is_free = false;
119 
120   // attach snap to ring
121   if (ring->head == NULL)
122     {
123       ring->head = snap;
124       ring->last = snap;
125     }
126   else
127     {
128       ring->last->next = snap;
129       ring->last       = snap;
130     }
131 
132   return snap;
133 }
134 
135 //
136 //
137 //
138 
139 void
skc_extent_ring_snap_free(struct skc_runtime * const runtime,struct skc_extent_ring_snap * const snap)140 skc_extent_ring_snap_free(struct skc_runtime          * const runtime,
141                           struct skc_extent_ring_snap * const snap)
142 {
143   // snap will be lazily freed
144   snap->is_free = true;
145 
146   //
147   // if this snapshot is no longer referenced then try to dispose of
148   // the ring buffer's leading unreferenced snapshots
149   //
150   struct skc_extent_ring      * const ring = snap->ring;
151   struct skc_extent_ring_snap *       curr = ring->head;
152 
153   if (!curr->is_free)
154     return;
155 
156   do {
157     // increment read counter
158     ring->outer.reads = curr->writes;
159 
160     struct skc_extent_ring_snap * const next = curr->next;
161 
162     skc_runtime_host_temp_free(runtime,curr,curr->id);
163 
164     curr = next;
165 
166     // this was the last snap...
167     if (curr == NULL)
168       {
169         ring->last = NULL;
170         break;
171       }
172 
173     // is the next free?
174   } while (curr->is_free);
175 
176   // update head
177   ring->head = curr;
178 }
179 
180 //
181 //
182 //
183 
184 skc_uint
skc_extent_ring_snap_count(struct skc_extent_ring_snap const * const snap)185 skc_extent_ring_snap_count(struct skc_extent_ring_snap const * const snap)
186 {
187   return snap->writes - snap->reads;
188 }
189 
190 skc_uint
skc_extent_ring_snap_from(struct skc_extent_ring_snap const * const snap)191 skc_extent_ring_snap_from(struct skc_extent_ring_snap const * const snap)
192 {
193   return snap->reads & snap->ring->size.mask;
194 }
195 
196 skc_uint
skc_extent_ring_snap_to(struct skc_extent_ring_snap const * const snap)197 skc_extent_ring_snap_to(struct skc_extent_ring_snap const * const snap)
198 {
199   return snap->writes & snap->ring->size.mask;
200 }
201 
202 //
203 //
204 //
205