LLFIO  v2.00
llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append Class Reference

Scalable many entity shared/exclusive file system based lock. More...

#include "atomic_append.hpp"

Inheritance diagram for llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append:
llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex

Public Types

using entity_type = shared_fs_mutex::entity_type
 The type of an entity id.
 
using entities_type = shared_fs_mutex::entities_type
 The type of a sequence of entities.
 

Public Member Functions

 atomic_append (const atomic_append &)=delete
 No copy construction.
 
atomic_appendoperator= (const atomic_append &)=delete
 No copy assignment.
 
 atomic_append (atomic_append &&o) noexcept
 Move constructor.
 
atomic_appendoperator= (atomic_append &&o) noexcept
 Move assign.
 
const file_handlehandle () const noexcept
 Return the handle to file being used for this lock.
 
virtual void unlock (entities_type entities, unsigned long long hint) noexcept final
 Unlock a previously locked sequence of entities.
 
entity_type entity_from_buffer (const char *buffer, size_t bytes, bool exclusive=true) noexcept
 Generates an entity id from a sequence of bytes.
 
template<typename T >
entity_type entity_from_string (const std::basic_string< T > &str, bool exclusive=true) noexcept
 Generates an entity id from a string.
 
entity_type random_entity (bool exclusive=true) noexcept
 Generates a cryptographically random entity id.
 
void fill_random_entities (span< entity_type > seq, bool exclusive=true) noexcept
 Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.
 
result< entities_guardlock (entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardlock (entity_type entity, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock a single entity for exclusive or shared access.
 
result< entities_guardtry_lock (entities_type entities) noexcept
 Try to lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardtry_lock (entity_type entity) noexcept
 Try to lock a single entity for exclusive or shared access.
 

Static Public Member Functions

static result< atomic_appendfs_mutex_append (const path_handle &base, path_view lockfile, bool nfs_compatibility=false, bool skip_hashing=false) noexcept
 

Protected Member Functions

virtual result< void > _lock (entities_guard &out, deadline d, bool spin_not_sleep) noexcept final
 

Detailed Description

Scalable many entity shared/exclusive file system based lock.

Lock files and byte ranges scale poorly to the number of items being concurrently locked with typically an exponential drop off in performance as the number of items being concurrently locked rises. This file system algorithm solves this problem using IPC via a shared append-only lock file.

  • Compatible with networked file systems (NFS too if the special nfs_compatibility flag is true. Note turning this on is not free of cost if you don't need NFS compatibility).
  • Nearly constant time to number of entities being locked.
  • Nearly constant time to number of processes concurrently using the lock (i.e. number of waiters).
  • Can sleep until a lock becomes free in a power-efficient manner.
  • Sudden power loss during use is recovered from.

Caveats:

  • Much slower than byte_ranges for few waiters or small number of entities.
  • Sudden process exit with locks held will deadlock all other users.
  • Maximum of twelve entities may be locked concurrently.
  • Wasteful of disk space if used on a non-extents based filing system (e.g. FAT32, ext3). It is best used in /tmp if possible (file_handle::temp_file()). If you really must use a non-extents based filing system, destroy and recreate the object instance periodically to force resetting the lock file's length to zero.
  • Similarly older operating systems (e.g. Linux < 3.0) do not implement extent hole punching and therefore will also see excessive disk space consumption. Note at the time of writing OS X doesn't implement hole punching at all.
  • If your OS doesn't have sane byte range locks (OS X, BSD, older Linuxes) and multiple objects in your process use the same lock file, misoperation will occur. Use lock_files instead.

    Todo:

    Implement hole punching once I port that code from LLFIO v1.

    Decide on some resolution mechanism for sudden process exit.

    There is a 1 out of 2^64-2 chance of unique id collision. It would be nice if we actually formally checked that our chosen unique id is actually unique.

Member Function Documentation

◆ _lock()

virtual result<void> llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::_lock ( entities_guard out,
deadline  d,
bool  spin_not_sleep 
)
inlinefinalprotectedvirtualnoexcept
Todo:
Read from header.last_known_good immediately if possible in order to avoid a duplicate read later

Implements llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

253  {
254  LLFIO_LOG_FUNCTION_CALL(this);
255  atomic_append_detail::lock_request lock_request;
256  if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
257  {
258  return errc::argument_list_too_long;
259  }
260 
261  std::chrono::steady_clock::time_point began_steady;
262  std::chrono::system_clock::time_point end_utc;
263  if(d)
264  {
265  if((d).steady)
266  {
267  began_steady = std::chrono::steady_clock::now();
268  }
269  else
270  {
271  end_utc = (d).to_time_point();
272  }
273  }
274  // Fire this if an error occurs
275  auto disableunlock = make_scope_exit([&]() noexcept { out.release(); });
276 
277  // Write my lock request immediately
278  memset(&lock_request, 0, sizeof(lock_request));
279  lock_request.unique_id = _unique_id;
280  auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
281  lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
282  lock_request.items = out.entities.size();
283  memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
284  if(!_skip_hashing)
285  {
286  lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
287  }
288  // My lock request will be the file's current length or higher
289  OUTCOME_TRY(auto &&my_lock_request_offset, _h.maximum_extent());
290  {
291  OUTCOME_TRYV(_h.set_append_only(true));
292  auto undo = make_scope_exit([this]() noexcept { (void) _h.set_append_only(false); });
293  file_handle::extent_guard append_guard;
294  if(_nfs_compatibility)
295  {
296  auto lastbyte = static_cast<file_handle::extent_type>(-1);
297  // Lock up to the beginning of the shadow lock space
298  lastbyte &= ~(1ULL << 63U);
299  OUTCOME_TRY(auto &&append_guard_, _h.lock_file_range(my_lock_request_offset, lastbyte, lock_kind::exclusive));
300  append_guard = std::move(append_guard_);
301  }
302  OUTCOME_TRYV(_h.write(0, {{reinterpret_cast<byte *>(&lock_request), sizeof(lock_request)}}));
303  }
304 
305  // Find the record I just wrote
306  alignas(64) byte _buffer[4096 + 2048]; // 6Kb cache line aligned buffer
307  // Read onwards from length as reported before I wrote my lock request
308  // until I find my lock request. This loop should never actually iterate
309  // except under extreme load conditions.
310  //! \todo Read from header.last_known_good immediately if possible in order
311  //! to avoid a duplicate read later
312  for(;;)
313  {
314  file_handle::buffer_type req{_buffer, sizeof(_buffer)};
315  file_handle::io_result<file_handle::buffers_type> readoutcome = _h.read({{&req, 1}, my_lock_request_offset});
316  // Should never happen :)
317  if(readoutcome.has_error())
318  {
319  LLFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
320  std::terminate();
321  }
322  const atomic_append_detail::lock_request *record, *lastrecord;
323  for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data()), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data() + readoutcome.value()[0].size()); record < lastrecord && record->hash != lock_request.hash;
324  ++record)
325  {
326  my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
327  }
328  if(record->hash == lock_request.hash)
329  {
330  break;
331  }
332  }
333 
334  // extent_guard is now valid and will be unlocked on error
335  out.hint = my_lock_request_offset;
336  disableunlock.release();
337 
338  // Lock my request for writing so others can sleep on me
339  file_handle::extent_guard my_request_guard;
340  if(!spin_not_sleep)
341  {
342  auto lock_offset = my_lock_request_offset;
343  // Set the top bit to use the shadow lock space on Windows
344  lock_offset |= (1ULL << 63U);
345  OUTCOME_TRY(auto &&my_request_guard_, _h.lock_file_range(lock_offset, sizeof(lock_request), lock_kind::exclusive));
346  my_request_guard = std::move(my_request_guard_);
347  }
348 
349  // Read every record preceding mine until header.first_known_good inclusive
350  auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
351  do
352  {
353  reload:
354  // Refresh the header and load a snapshot of everything between record_offset
355  // and first_known_good or -6Kb, whichever the sooner
356  OUTCOME_TRYV(_read_header());
357  // If there are no preceding records, we're done
358  if(record_offset < _header.first_known_good)
359  {
360  break;
361  }
362  auto start_offset = record_offset;
363  if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
364  {
365  start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
366  }
367  else
368  {
369  start_offset = sizeof(atomic_append_detail::lock_request);
370  }
371  if(start_offset < _header.first_known_good)
372  {
373  start_offset = _header.first_known_good;
374  }
375  assert(record_offset >= start_offset);
376  assert(record_offset - start_offset <= sizeof(_buffer));
377  file_handle::buffer_type req{_buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)};
378  OUTCOME_TRY(auto &&batchread, _h.read({{&req, 1}, start_offset}));
379  assert(batchread[0].size() == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
380  const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data() + batchread[0].size() - sizeof(atomic_append_detail::lock_request));
381  const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data());
382 
383  // Skip all completed lock requests or not mentioning any of my entities
384  for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
385  {
386  // If a completed lock request, skip
387  if(!record->hash && (record->unique_id == 0u))
388  {
389  continue;
390  }
391  // If record hash doesn't match contents it's a torn read, reload
392  if(!_skip_hashing)
393  {
394  if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<const char *>(record)) + 16, sizeof(atomic_append_detail::lock_request) - 16))
395  {
396  goto reload;
397  }
398  }
399 
400  // Does this record lock anything I am locking?
401  for(const auto &entity : out.entities)
402  {
403  for(size_t n = 0; n < record->items; n++)
404  {
405  if(record->entities[n].value == entity.value)
406  {
407  // Is the lock I want exclusive or the lock he wants exclusive?
408  // If so, need to block
409  if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
410  {
411  goto beginwait;
412  }
413  }
414  }
415  }
416  }
417  // None of this batch of records has anything to do with my request, so keep going
418  continue;
419 
420  beginwait:
421  // Sleep until this record is freed using a shared lock
422  // on the record in our way. Note there is a race here
423  // between when the lock requester writes the lock
424  // request and when he takes an exclusive lock on it,
425  // so if our shared lock succeeds we need to immediately
426  // unlock and retry based on the data.
427  std::this_thread::yield();
428  if(!spin_not_sleep)
429  {
430  deadline nd;
431  if(d)
432  {
433  if((d).steady)
434  {
435  std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
436  if(ns.count() < 0)
437  {
438  (nd).nsecs = 0;
439  }
440  else
441  {
442  (nd).nsecs = ns.count();
443  }
444  }
445  else
446  {
447  (nd) = (d);
448  }
449  }
450  auto lock_offset = record_offset;
451  // Set the top bit to use the shadow lock space on Windows
452  lock_offset |= (1ULL << 63U);
453  OUTCOME_TRYV(_h.lock_file_range(lock_offset, sizeof(*record), lock_kind::shared, nd));
454  }
455  // Make sure we haven't timed out during this wait
456  if(d)
457  {
458  if((d).steady)
459  {
460  if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
461  {
462  return errc::timed_out;
463  }
464  }
465  else
466  {
467  if(std::chrono::system_clock::now() >= end_utc)
468  {
469  return errc::timed_out;
470  }
471  }
472  }
473  } while(record_offset >= _header.first_known_good);
474  return success();
475  }
io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle, preferentially using any i/o multiplexer set over the virtually overri...
Definition: byte_io_handle.hpp:345
io_result< buffers_type > read(io_request< buffers_type > reqs, deadline d=deadline()) noexcept
Read data from the open handle, preferentially using any i/o multiplexer set over the virtually overr...
Definition: byte_io_handle.hpp:297
virtual result< extent_type > maximum_extent() const noexcept
virtual result< void > set_append_only(bool enable) noexcept
EXTENSION: Changes whether this handle is append only or not.
virtual result< extent_guard > lock_file_range(extent_type offset, extent_type bytes, lock_kind kind, deadline d=deadline()) noexcept
EXTENSION: Tries to lock the range of bytes specified for shared or exclusive access....
@ shared
Exclude only those requesting an exclusive lock on the same inode.
@ exclusive
Exclude those requesting any kind of lock on the same inode.

◆ fs_mutex_append()

static result<atomic_append> llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::fs_mutex_append ( const path_handle base,
path_view  lockfile,
bool  nfs_compatibility = false,
bool  skip_hashing = false 
)
inlinestaticnoexcept

Initialises a shared filing system mutex using the file at lockfile

Returns
An implementation of shared_fs_mutex using the atomic_append algorithm.
Parameters
baseOptional base for the path to the file.
lockfileThe path to the file to use for IPC.
nfs_compatibilityMake this true if the lockfile could be accessed by NFS.
skip_hashingSome filing systems (typically the copy on write ones e.g. ZFS, btrfs) guarantee atomicity of updates and therefore torn writes are never observed by readers. For these, hashing can be safely disabled.
Todo:
fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
208  {
209  LLFIO_LOG_FUNCTION_CALL(0);
211  atomic_append_detail::header header;
212  // Lock the entire header for exclusive access
213  auto lockresult = ret.lock_file_range(0, sizeof(header), lock_kind::exclusive, std::chrono::seconds(0));
214  //! \todo fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
215  if(lockresult.has_error())
216  {
217  if(lockresult.error() != errc::timed_out)
218  {
219  return std::move(lockresult).error();
220  }
221  // Somebody else is also using this file
222  }
223  else
224  {
225  // I am the first person to be using this (stale?) file, so write a new header and truncate
226  OUTCOME_TRYV(ret.truncate(sizeof(header)));
227  memset(&header, 0, sizeof(header));
228  header.time_offset = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
229  header.first_known_good = sizeof(header);
230  header.first_after_hole_punch = sizeof(header);
231  if(!skip_hashing)
232  {
233  header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&header)) + 16, sizeof(header) - 16);
234  }
235  OUTCOME_TRYV(ret.write(0, {{reinterpret_cast<byte *>(&header), sizeof(header)}}));
236  }
237  // Open a shared lock on last byte in header to prevent other users zomping the file
238  OUTCOME_TRY(auto &&guard, ret.lock_file_range(sizeof(header) - 1, 1, lock_kind::shared));
239  // Unlock any exclusive lock I gained earlier now
240  if(lockresult)
241  {
242  lockresult.value().unlock();
243  }
244  // The constructor will read and cache the header
245  return atomic_append(std::move(ret), std::move(guard), nfs_compatibility, skip_hashing);
246  }
static result< file_handle > file(const path_handle &base, path_view_type path, mode _mode=mode::read, creation _creation=creation::open_existing, caching _caching=caching::all, flag flags=flag::none) noexcept
@ write
Ability to read and write (READ_CONTROL|FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA|FILE_WRITE_D...
@ temporary
Cache reads and writes of data and metadata so they complete immediately, only sending any updates to...
@ if_needed
If filesystem entry exists that is used, else one is created.

The documentation for this class was generated from the following file: