LLFIO v2.00
Loading...
Searching...
No Matches
llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append Class Reference

Scalable many entity shared/exclusive file system based lock. More...

#include "atomic_append.hpp"

Inheritance diagram for llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append:
llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex

Public Types

using entity_type = shared_fs_mutex::entity_type
 The type of an entity id.
 
using entities_type = shared_fs_mutex::entities_type
 The type of a sequence of entities.
 

Public Member Functions

 atomic_append (const atomic_append &)=delete
 No copy construction.
 
atomic_appendoperator= (const atomic_append &)=delete
 No copy assignment.
 
 atomic_append (atomic_append &&o) noexcept
 Move constructor.
 
atomic_appendoperator= (atomic_append &&o) noexcept
 Move assign.
 
const file_handlehandle () const noexcept
 Return the handle to file being used for this lock.
 
virtual void unlock (entities_type entities, unsigned long long hint) noexcept final
 Unlock a previously locked sequence of entities.
 
entity_type entity_from_buffer (const char *buffer, size_t bytes, bool exclusive=true) noexcept
 Generates an entity id from a sequence of bytes.
 
template<typename T >
entity_type entity_from_string (const std::basic_string< T > &str, bool exclusive=true) noexcept
 Generates an entity id from a string.
 
entity_type random_entity (bool exclusive=true) noexcept
 Generates a cryptographically random entity id.
 
void fill_random_entities (span< entity_type > seq, bool exclusive=true) noexcept
 Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.
 
result< entities_guardlock (entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardlock (entity_type entity, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock a single entity for exclusive or shared access.
 
result< entities_guardtry_lock (entities_type entities) noexcept
 Try to lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardtry_lock (entity_type entity) noexcept
 Try to lock a single entity for exclusive or shared access.
 

Static Public Member Functions

static result< atomic_appendfs_mutex_append (const path_handle &base, path_view lockfile, bool nfs_compatibility=false, bool skip_hashing=false) noexcept
 

Protected Member Functions

virtual result< void > _lock (entities_guard &out, deadline d, bool spin_not_sleep) noexcept final
 

Detailed Description

Scalable many entity shared/exclusive file system based lock.

Lock files and byte ranges scale poorly to the number of items being concurrently locked with typically an exponential drop off in performance as the number of items being concurrently locked rises. This file system algorithm solves this problem using IPC via a shared append-only lock file.

  • Compatible with networked file systems (NFS too if the special nfs_compatibility flag is true. Note turning this on is not free of cost if you don't need NFS compatibility).
  • Nearly constant time to number of entities being locked.
  • Nearly constant time to number of processes concurrently using the lock (i.e. number of waiters).
  • Can sleep until a lock becomes free in a power-efficient manner.
  • Sudden power loss during use is recovered from.

Caveats:

  • Much slower than byte_ranges for few waiters or small number of entities.
  • Sudden process exit with locks held will deadlock all other users.
  • Maximum of twelve entities may be locked concurrently.
  • Wasteful of disk space if used on a non-extents based filing system (e.g. FAT32, ext3). It is best used in /tmp if possible (file_handle::temp_file()). If you really must use a non-extents based filing system, destroy and recreate the object instance periodically to force resetting the lock file's length to zero.
  • Similarly older operating systems (e.g. Linux < 3.0) do not implement extent hole punching and therefore will also see excessive disk space consumption. Note at the time of writing OS X doesn't implement hole punching at all.
  • If your OS doesn't have sane byte range locks (OS X, BSD, older Linuxes) and multiple objects in your process use the same lock file, misoperation will occur. Use lock_files instead.

    Todo:

    Implement hole punching once I port that code from LLFIO v1.

    Decide on some resolution mechanism for sudden process exit.

    There is a 1 out of 2^64-2 chance of unique id collision. It would be nice if we actually formally checked that our chosen unique id is actually unique.

Constructor & Destructor Documentation

◆ atomic_append()

llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::atomic_append ( atomic_append &&  o)
inlinenoexcept

Move constructor.

183: _h(std::move(o._h)), _guard(std::move(o._guard)), _nfs_compatibility(o._nfs_compatibility), _skip_hashing(o._skip_hashing), _unique_id(o._unique_id), _header(o._header) { _guard.set_handle(&_h); }
void set_handle(lockable_byte_io_handle *h) noexcept
Sets the lockable_byte_io_handle to be unlocked.
Definition lockable_byte_io_handle.hpp:207

Member Function Documentation

◆ _lock()

virtual result< void > llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::_lock ( entities_guard out,
deadline  d,
bool  spin_not_sleep 
)
inlinefinalprotectedvirtualnoexcept
Todo:
Read from header.last_known_good immediately if possible in order to avoid a duplicate read later

Implements llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

253 {
254 LLFIO_LOG_FUNCTION_CALL(this);
255 atomic_append_detail::lock_request lock_request;
256 if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
257 {
258 return errc::argument_list_too_long;
259 }
260
261 std::chrono::steady_clock::time_point began_steady;
262 std::chrono::system_clock::time_point end_utc;
263 if(d)
264 {
265 if((d).steady)
266 {
267 began_steady = std::chrono::steady_clock::now();
268 }
269 else
270 {
271 end_utc = (d).to_time_point();
272 }
273 }
274 // Fire this if an error occurs
275 auto disableunlock = make_scope_exit([&]() noexcept { out.release(); });
276
277 // Write my lock request immediately
278 memset(&lock_request, 0, sizeof(lock_request));
279 lock_request.unique_id = _unique_id;
280 auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
281 lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
282 lock_request.items = out.entities.size();
283 memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
284 if(!_skip_hashing)
285 {
286 lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
287 }
288 // My lock request will be the file's current length or higher
289 OUTCOME_TRY(auto &&my_lock_request_offset, _h.maximum_extent());
290 {
291 OUTCOME_TRYV(_h.set_append_only(true));
292 auto undo = make_scope_exit([this]() noexcept { (void) _h.set_append_only(false); });
293 file_handle::extent_guard append_guard;
294 if(_nfs_compatibility)
295 {
296 auto lastbyte = static_cast<file_handle::extent_type>(-1);
297 // Lock up to the beginning of the shadow lock space
298 lastbyte &= ~(1ULL << 63U);
299 OUTCOME_TRY(auto &&append_guard_, _h.lock_file_range(my_lock_request_offset, lastbyte, lock_kind::exclusive));
300 append_guard = std::move(append_guard_);
301 }
302 OUTCOME_TRYV(_h.write(0, {{reinterpret_cast<byte *>(&lock_request), sizeof(lock_request)}}));
303 }
304
305 // Find the record I just wrote
306 alignas(64) byte _buffer[4096 + 2048]; // 6Kb cache line aligned buffer
307 // Read onwards from length as reported before I wrote my lock request
308 // until I find my lock request. This loop should never actually iterate
309 // except under extreme load conditions.
310 //! \todo Read from header.last_known_good immediately if possible in order
311 //! to avoid a duplicate read later
312 for(;;)
313 {
314 file_handle::buffer_type req{_buffer, sizeof(_buffer)};
315 file_handle::io_result<file_handle::buffers_type> readoutcome = _h.read({{&req, 1}, my_lock_request_offset});
316 // Should never happen :)
317 if(readoutcome.has_error())
318 {
319 LLFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
320 std::terminate();
321 }
322 const atomic_append_detail::lock_request *record, *lastrecord;
323 for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data()), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data() + readoutcome.value()[0].size()); record < lastrecord && record->hash != lock_request.hash;
324 ++record)
325 {
326 my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
327 }
328 if(record->hash == lock_request.hash)
329 {
330 break;
331 }
332 }
333
334 // extent_guard is now valid and will be unlocked on error
335 out.hint = my_lock_request_offset;
336 disableunlock.release();
337
338 // Lock my request for writing so others can sleep on me
339 file_handle::extent_guard my_request_guard;
340 if(!spin_not_sleep)
341 {
342 auto lock_offset = my_lock_request_offset;
343 // Set the top bit to use the shadow lock space on Windows
344 lock_offset |= (1ULL << 63U);
345 OUTCOME_TRY(auto &&my_request_guard_, _h.lock_file_range(lock_offset, sizeof(lock_request), lock_kind::exclusive));
346 my_request_guard = std::move(my_request_guard_);
347 }
348
349 // Read every record preceding mine until header.first_known_good inclusive
350 auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
351 do
352 {
353 reload:
354 // Refresh the header and load a snapshot of everything between record_offset
355 // and first_known_good or -6Kb, whichever the sooner
356 OUTCOME_TRYV(_read_header());
357 // If there are no preceding records, we're done
358 if(record_offset < _header.first_known_good)
359 {
360 break;
361 }
362 auto start_offset = record_offset;
363 if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
364 {
365 start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
366 }
367 else
368 {
369 start_offset = sizeof(atomic_append_detail::lock_request);
370 }
371 if(start_offset < _header.first_known_good)
372 {
373 start_offset = _header.first_known_good;
374 }
375 assert(record_offset >= start_offset);
376 assert(record_offset - start_offset <= sizeof(_buffer));
377 file_handle::buffer_type req{_buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)};
378 OUTCOME_TRY(auto &&batchread, _h.read({{&req, 1}, start_offset}));
379 assert(batchread[0].size() == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
380 const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data() + batchread[0].size() - sizeof(atomic_append_detail::lock_request));
381 const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data());
382
383 // Skip all completed lock requests or not mentioning any of my entities
384 for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
385 {
386 // If a completed lock request, skip
387 if(!record->hash && (record->unique_id == 0u))
388 {
389 continue;
390 }
391 // If record hash doesn't match contents it's a torn read, reload
392 if(!_skip_hashing)
393 {
394 if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<const char *>(record)) + 16, sizeof(atomic_append_detail::lock_request) - 16))
395 {
396 goto reload;
397 }
398 }
399
400 // Does this record lock anything I am locking?
401 for(const auto &entity : out.entities)
402 {
403 for(size_t n = 0; n < record->items; n++)
404 {
405 if(record->entities[n].value == entity.value)
406 {
407 // Is the lock I want exclusive or the lock he wants exclusive?
408 // If so, need to block
409 if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
410 {
411 goto beginwait;
412 }
413 }
414 }
415 }
416 }
417 // None of this batch of records has anything to do with my request, so keep going
418 continue;
419
420 beginwait:
421 // Sleep until this record is freed using a shared lock
422 // on the record in our way. Note there is a race here
423 // between when the lock requester writes the lock
424 // request and when he takes an exclusive lock on it,
425 // so if our shared lock succeeds we need to immediately
426 // unlock and retry based on the data.
427 std::this_thread::yield();
428 if(!spin_not_sleep)
429 {
430 deadline nd;
431 if(d)
432 {
433 if((d).steady)
434 {
435 std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
436 if(ns.count() < 0)
437 {
438 (nd).nsecs = 0;
439 }
440 else
441 {
442 (nd).nsecs = ns.count();
443 }
444 }
445 else
446 {
447 (nd) = (d);
448 }
449 }
450 auto lock_offset = record_offset;
451 // Set the top bit to use the shadow lock space on Windows
452 lock_offset |= (1ULL << 63U);
453 OUTCOME_TRYV(_h.lock_file_range(lock_offset, sizeof(*record), lock_kind::shared, nd));
454 }
455 // Make sure we haven't timed out during this wait
456 if(d)
457 {
458 if((d).steady)
459 {
460 if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
461 {
462 return errc::timed_out;
463 }
464 }
465 else
466 {
467 if(std::chrono::system_clock::now() >= end_utc)
468 {
469 return errc::timed_out;
470 }
471 }
472 }
473 } while(record_offset >= _header.first_known_good);
474 return success();
475 }
io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle, preferentially using any i/o multiplexer set over the virtually overri...
Definition byte_io_handle.hpp:345
io_result< buffers_type > read(io_request< buffers_type > reqs, deadline d=deadline()) noexcept
Read data from the open handle, preferentially using any i/o multiplexer set over the virtually overr...
Definition byte_io_handle.hpp:297
virtual result< extent_type > maximum_extent() const noexcept
virtual result< void > set_append_only(bool enable) noexcept
EXTENSION: Changes whether this handle is append only or not.
virtual result< extent_guard > lock_file_range(extent_type offset, extent_type bytes, lock_kind kind, deadline d=deadline()) noexcept
EXTENSION: Tries to lock the range of bytes specified for shared or exclusive access....
@ shared
Exclude only those requesting an exclusive lock on the same inode.
@ exclusive
Exclude those requesting any kind of lock on the same inode.

◆ entity_from_buffer()

entity_type llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::entity_from_buffer ( const char *  buffer,
size_t  bytes,
bool  exclusive = true 
)
inlinenoexceptinherited

Generates an entity id from a sequence of bytes.

116 {
117 uint128 hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(buffer, bytes);
118 return {hash.as_longlongs[0] ^ hash.as_longlongs[1], exclusive};
119 }
QUICKCPPLIB_NAMESPACE::integers128::uint128 uint128
Unsigned 128 bit integer.
Definition base.hpp:45

◆ entity_from_string()

template<typename T >
entity_type llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::entity_from_string ( const std::basic_string< T > &  str,
bool  exclusive = true 
)
inlinenoexceptinherited

Generates an entity id from a string.

122 {
123 uint128 hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(str);
124 return {hash.as_longlongs[0] ^ hash.as_longlongs[1], exclusive};
125 }

◆ fill_random_entities()

void llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::fill_random_entities ( span< entity_type seq,
bool  exclusive = true 
)
inlinenoexceptinherited

Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.

135 {
136 utils::random_fill(reinterpret_cast<char *>(seq.data()), seq.size() * sizeof(entity_type));
137 for(auto &i : seq)
138 {
139 i.exclusive = exclusive; // NOLINT
140 }
141 }
void random_fill(char *buffer, size_t bytes) noexcept
Fills the buffer supplied with cryptographically strong randomness. Uses the OS kernel API.

◆ fs_mutex_append()

static result< atomic_append > llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::fs_mutex_append ( const path_handle base,
path_view  lockfile,
bool  nfs_compatibility = false,
bool  skip_hashing = false 
)
inlinestaticnoexcept

Initialises a shared filing system mutex using the file at lockfile

Returns
An implementation of shared_fs_mutex using the atomic_append algorithm.
Parameters
baseOptional base for the path to the file.
lockfileThe path to the file to use for IPC.
nfs_compatibilityMake this true if the lockfile could be accessed by NFS.
skip_hashingSome filing systems (typically the copy on write ones e.g. ZFS, btrfs) guarantee atomicity of updates and therefore torn writes are never observed by readers. For these, hashing can be safely disabled.
Todo:
fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
208 {
209 LLFIO_LOG_FUNCTION_CALL(0);
210 OUTCOME_TRY(auto &&ret, file_handle::file(base, lockfile, file_handle::mode::write, file_handle::creation::if_needed, file_handle::caching::temporary));
211 atomic_append_detail::header header;
212 // Lock the entire header for exclusive access
213 auto lockresult = ret.lock_file_range(0, sizeof(header), lock_kind::exclusive, std::chrono::seconds(0));
214 //! \todo fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
215 if(lockresult.has_error())
216 {
217 if(lockresult.error() != errc::timed_out)
218 {
219 return std::move(lockresult).error();
220 }
221 // Somebody else is also using this file
222 }
223 else
224 {
225 // I am the first person to be using this (stale?) file, so write a new header and truncate
226 OUTCOME_TRYV(ret.truncate(sizeof(header)));
227 memset(&header, 0, sizeof(header));
228 header.time_offset = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
229 header.first_known_good = sizeof(header);
230 header.first_after_hole_punch = sizeof(header);
231 if(!skip_hashing)
232 {
233 header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&header)) + 16, sizeof(header) - 16);
234 }
235 OUTCOME_TRYV(ret.write(0, {{reinterpret_cast<byte *>(&header), sizeof(header)}}));
236 }
237 // Open a shared lock on last byte in header to prevent other users zomping the file
238 OUTCOME_TRY(auto &&guard, ret.lock_file_range(sizeof(header) - 1, 1, lock_kind::shared));
239 // Unlock any exclusive lock I gained earlier now
240 if(lockresult)
241 {
242 lockresult.value().unlock();
243 }
244 // The constructor will read and cache the header
245 return atomic_append(std::move(ret), std::move(guard), nfs_compatibility, skip_hashing);
246 }
static result< file_handle > file(const path_handle &base, path_view_type path, mode _mode=mode::read, creation _creation=creation::open_existing, caching _caching=caching::all, flag flags=flag::none) noexcept

◆ handle()

const file_handle & llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::handle ( ) const
inlinenoexcept

Return the handle to file being used for this lock.

249{ return _h; }

◆ lock() [1/2]

result< entities_guard > llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::lock ( entities_type  entities,
deadline  d = deadline(),
bool  spin_not_sleep = false 
)
inlinenoexceptinherited

Lock all of a sequence of entities for exclusive or shared access.

222 {
223 entities_guard ret(this, entities);
224 OUTCOME_TRYV(_lock(ret, d, spin_not_sleep));
225 return {std::move(ret)};
226 }

◆ lock() [2/2]

result< entities_guard > llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::lock ( entity_type  entity,
deadline  d = deadline(),
bool  spin_not_sleep = false 
)
inlinenoexceptinherited

Lock a single entity for exclusive or shared access.

229 {
230 entities_guard ret(this, entity);
231 OUTCOME_TRYV(_lock(ret, d, spin_not_sleep));
232 return {std::move(ret)};
233 }

◆ operator=()

atomic_append & llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::operator= ( atomic_append &&  o)
inlinenoexcept

Move assign.

186 {
187 if(this == &o)
188 {
189 return *this;
190 }
191 this->~atomic_append();
192 new(this) atomic_append(std::move(o));
193 return *this;
194 }

◆ random_entity()

entity_type llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::random_entity ( bool  exclusive = true)
inlinenoexceptinherited

Generates a cryptographically random entity id.

128 {
130 utils::random_fill(reinterpret_cast<char *>(&v), sizeof(v));
131 return {v, exclusive};
132 }
handle::extent_type value_type
The type backing the value.
Definition base.hpp:63

◆ try_lock() [1/2]

result< entities_guard > llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::try_lock ( entities_type  entities)
inlinenoexceptinherited

Try to lock all of a sequence of entities for exclusive or shared access.

235{ return lock(entities, deadline(std::chrono::seconds(0))); }
result< entities_guard > lock(entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
Lock all of a sequence of entities for exclusive or shared access.
Definition base.hpp:221

◆ try_lock() [2/2]

result< entities_guard > llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex::try_lock ( entity_type  entity)
inlinenoexceptinherited

Try to lock a single entity for exclusive or shared access.

237{ return lock(entity, deadline(std::chrono::seconds(0))); }

◆ unlock()

virtual void llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::unlock ( entities_type  entities,
unsigned long long  hint 
)
inlinefinalvirtualnoexcept

Unlock a previously locked sequence of entities.

Implements llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

479 {
480 (void) entities;
481 LLFIO_LOG_FUNCTION_CALL(this);
482 if(hint == 0u)
483 {
484 LLFIO_LOG_WARN(this, "atomic_append::unlock() currently requires a hint to work, assuming this is a failed lock.");
485 return;
486 }
487 auto my_lock_request_offset = static_cast<file_handle::extent_type>(hint);
488 {
489 atomic_append_detail::lock_request record;
490#ifdef _DEBUG
491 (void) _h.read(my_lock_request_offset, {{(byte *) &record, sizeof(record)}});
492 if(!record.unique_id)
493 {
494 LLFIO_LOG_FATAL(this, "atomic_append::unlock() I have been previously unlocked!");
495 std::terminate();
496 }
497 (void) _read_header();
498 if(_header.first_known_good > my_lock_request_offset)
499 {
500 LLFIO_LOG_FATAL(this, "atomic_append::unlock() header exceeds the lock I am unlocking!");
501 std::terminate();
502 }
503#endif
504 memset(&record, 0, sizeof(record));
505 (void) _h.write(my_lock_request_offset, {{reinterpret_cast<byte *>(&record), sizeof(record)}});
506 }
507
508 // Every 32 records or so, bump _header.first_known_good
509 if((my_lock_request_offset & 4095U) == 0U)
510 {
511 //_read_header();
512
513 // Forward scan records until first non-zero record is found
514 // and update header with new info
515 alignas(64) byte _buffer[4096 + 2048];
516 bool done = false;
517 while(!done)
518 {
519 file_handle::buffer_type req{_buffer, sizeof(_buffer)};
520 auto bytesread_ = _h.read({{&req, 1}, _header.first_known_good});
521 if(bytesread_.has_error())
522 {
523 // If distance between original first known good and end of file is exactly
524 // 6Kb we can read an EOF
525 break;
526 }
527 const auto &bytesread = bytesread_.value();
528 // If read was partial, we are done after this round
529 if(bytesread[0].size() < sizeof(_buffer))
530 {
531 done = true;
532 }
533 const auto *record = reinterpret_cast<const atomic_append_detail::lock_request *>(bytesread[0].data());
534 const auto *lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(bytesread[0].data() + bytesread[0].size());
535 for(; record < lastrecord; ++record)
536 {
537 if(!record->hash && (record->unique_id == 0u))
538 {
539 _header.first_known_good += sizeof(atomic_append_detail::lock_request);
540 }
541 else
542 {
543 break;
544 }
545 }
546 }
547 // Hole punch if >= 1Mb of zeros exists
548 if(_header.first_known_good - _header.first_after_hole_punch >= 1024U * 1024U)
549 {
550 handle::extent_type holepunchend = _header.first_known_good & ~(1024U * 1024U - 1);
551#ifdef _DEBUG
552 fprintf(stderr, "hole_punch(%llx, %llx)\n", _header.first_after_hole_punch, holepunchend - _header.first_after_hole_punch);
553#endif
554 _header.first_after_hole_punch = holepunchend;
555 }
556 ++_header.generation;
557 if(!_skip_hashing)
558 {
559 _header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&_header)) + 16, sizeof(_header) - 16);
560 }
561 // Rewrite the first part of the header only
562 (void) _h.write(0, {{reinterpret_cast<byte *>(&_header), 48}});
563 }
564 }
unsigned long long extent_type
The file extent type used by this handle.
Definition handle.hpp:62

The documentation for this class was generated from the following file: