AFIO  v2.00 late alpha
afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append Class Reference

Scalable many entity shared/exclusive file system based lock. More...

#include "atomic_append.hpp"

Inheritance diagram for afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append:
afio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex

Public Types

using entity_type = shared_fs_mutex::entity_type
 The type of an entity id.
 
using entities_type = shared_fs_mutex::entities_type
 The type of a sequence of entities.
 

Public Member Functions

 atomic_append (const atomic_append &)=delete
 No copy construction.
 
atomic_appendoperator= (const atomic_append &)=delete
 No copy assignment.
 
 atomic_append (atomic_append &&o) noexcept
 Move constructor.
 
atomic_appendoperator= (atomic_append &&o) noexcept
 Move assign.
 
const file_handlehandle () const noexcept
 Return the handle to file being used for this lock.
 
virtual void unlock (entities_type entities, unsigned long long hint) noexcept final
 Unlock a previously locked sequence of entities.
 
entity_type entity_from_buffer (const char *buffer, size_t bytes, bool exclusive=true) noexcept
 Generates an entity id from a sequence of bytes.
 
template<typename T >
entity_type entity_from_string (const std::basic_string< T > &str, bool exclusive=true) noexcept
 Generates an entity id from a string.
 
entity_type random_entity (bool exclusive=true) noexcept
 Generates a cryptographically random entity id.
 
void fill_random_entities (span< entity_type > seq, bool exclusive=true) noexcept
 Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.
 
result< entities_guardlock (entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardlock (entity_type entity, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock a single entity for exclusive or shared access.
 
result< entities_guardtry_lock (entities_type entities) noexcept
 Try to lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardtry_lock (entity_type entity) noexcept
 Try to lock a single entity for exclusive or shared access.
 

Static Public Member Functions

static result< atomic_appendfs_mutex_append (const path_handle &base, path_view lockfile, bool nfs_compatibility=false, bool skip_hashing=false) noexcept
 

Protected Member Functions

virtual result< void > _lock (entities_guard &out, deadline d, bool spin_not_sleep) noexcept final
 

Detailed Description

Scalable many entity shared/exclusive file system based lock.

Lock files and byte ranges scale poorly to the number of items being concurrently locked with typically an exponential drop off in performance as the number of items being concurrently locked rises. This file system algorithm solves this problem using IPC via a shared append-only lock file.

  • Compatible with networked file systems (NFS too if the special nfs_compatibility flag is true. Note turning this on is not free of cost if you don't need NFS compatibility).
  • Nearly constant time to number of entities being locked.
  • Nearly constant time to number of processes concurrently using the lock (i.e. number of waiters).
  • Can sleep until a lock becomes free in a power-efficient manner.
  • Sudden power loss during use is recovered from.

Caveats:

  • Much slower than byte_ranges for few waiters or small number of entities.
  • Sudden process exit with locks held will deadlock all other users.
  • Maximum of twelve entities may be locked concurrently.
  • Wasteful of disk space if used on a non-extents based filing system (e.g. FAT32, ext3). It is best used in /tmp if possible (file_handle::temp_file()). If you really must use a non-extents based filing system, destroy and recreate the object instance periodically to force resetting the lock file's length to zero.
  • Similarly older operating systems (e.g. Linux < 3.0) do not implement extent hole punching and therefore will also see excessive disk space consumption. Note at the time of writing OS X doesn't implement hole punching at all.
  • If your OS doesn't have sane byte range locks (OS X, BSD, older Linuxes) and multiple objects in your process use the same lock file, misoperation will occur. Use lock_files instead.
Todo:

Implement hole punching once I port that code from AFIO v1.

Decide on some resolution mechanism for sudden process exit.

There is a 1 out of 2^64-2 chance of unique id collision. It would be nice if we actually formally checked that our chosen unique id is actually unique.

Member Function Documentation

◆ _lock()

virtual result<void> afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::_lock ( entities_guard out,
deadline  d,
bool  spin_not_sleep 
)
inlinefinalprotectedvirtualnoexcept
Todo:
Read from header.last_known_good immediately if possible in order to avoid a duplicate read later

Implements afio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

241  {
242  AFIO_LOG_FUNCTION_CALL(this);
243  atomic_append_detail::lock_request lock_request;
244  if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
245  {
246  return errc::argument_list_too_long;
247  }
248 
249  std::chrono::steady_clock::time_point began_steady;
250  std::chrono::system_clock::time_point end_utc;
251  if(d)
252  {
253  if((d).steady)
254  {
255  began_steady = std::chrono::steady_clock::now();
256  }
257  else
258  {
259  end_utc = (d).to_time_point();
260  }
261  }
262  // Fire this if an error occurs
263  auto disableunlock = undoer([&] { out.release(); });
264 
265  // Write my lock request immediately
266  memset(&lock_request, 0, sizeof(lock_request));
267  lock_request.unique_id = _unique_id;
268  auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
269  lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
270  lock_request.items = out.entities.size();
271  memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
272  if(!_skip_hashing)
273  {
274  lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
275  }
276  // My lock request will be the file's current length or higher
277  OUTCOME_TRY(my_lock_request_offset, _h.maximum_extent());
278  {
279  OUTCOME_TRYV(_h.set_append_only(true));
280  auto undo = undoer([this] { (void) _h.set_append_only(false); });
281  file_handle::extent_guard append_guard;
282  if(_nfs_compatibility)
283  {
284  auto lastbyte = static_cast<file_handle::extent_type>(-1);
285  // Lock up to the beginning of the shadow lock space
286  lastbyte &= ~(1ULL << 63U);
287  OUTCOME_TRY(append_guard_, _h.lock(my_lock_request_offset, lastbyte, true));
288  append_guard = std::move(append_guard_);
289  }
290  OUTCOME_TRYV(_h.write(0, {{reinterpret_cast<byte *>(&lock_request), sizeof(lock_request)}}));
291  }
292 
293  // Find the record I just wrote
294  alignas(64) byte _buffer[4096 + 2048]; // 6Kb cache line aligned buffer
295  // Read onwards from length as reported before I wrote my lock request
296  // until I find my lock request. This loop should never actually iterate
297  // except under extreme load conditions.
298  //! \todo Read from header.last_known_good immediately if possible in order
299  //! to avoid a duplicate read later
300  for(;;)
301  {
302  file_handle::io_result<file_handle::buffers_type> readoutcome = _h.read(my_lock_request_offset, {{_buffer, sizeof(_buffer)}});
303  // Should never happen :)
304  if(readoutcome.has_error())
305  {
306  AFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
307  std::terminate();
308  }
309  const atomic_append_detail::lock_request *record, *lastrecord;
310  for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data + readoutcome.value()[0].len); record < lastrecord && record->hash != lock_request.hash; ++record)
311  {
312  my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
313  }
314  if(record->hash == lock_request.hash)
315  {
316  break;
317  }
318  }
319 
320  // extent_guard is now valid and will be unlocked on error
321  out.hint = my_lock_request_offset;
322  disableunlock.dismiss();
323 
324  // Lock my request for writing so others can sleep on me
325  file_handle::extent_guard my_request_guard;
326  if(!spin_not_sleep)
327  {
328  auto lock_offset = my_lock_request_offset;
329  // Set the top bit to use the shadow lock space on Windows
330  lock_offset |= (1ULL << 63U);
331  OUTCOME_TRY(my_request_guard_, _h.lock(lock_offset, sizeof(lock_request), true));
332  my_request_guard = std::move(my_request_guard_);
333  }
334 
335  // Read every record preceding mine until header.first_known_good inclusive
336  auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
337  do
338  {
339  reload:
340  // Refresh the header and load a snapshot of everything between record_offset
341  // and first_known_good or -6Kb, whichever the sooner
342  OUTCOME_TRYV(_read_header());
343  // If there are no preceding records, we're done
344  if(record_offset < _header.first_known_good)
345  {
346  break;
347  }
348  auto start_offset = record_offset;
349  if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
350  {
351  start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
352  }
353  else
354  {
355  start_offset = sizeof(atomic_append_detail::lock_request);
356  }
357  if(start_offset < _header.first_known_good)
358  {
359  start_offset = _header.first_known_good;
360  }
361  assert(record_offset >= start_offset);
362  assert(record_offset - start_offset <= sizeof(_buffer));
363  OUTCOME_TRY(batchread, _h.read(start_offset, {{_buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)}}));
364  assert(batchread[0].len == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
365  const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data + batchread[0].len - sizeof(atomic_append_detail::lock_request));
366  const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data);
367 
368  // Skip all completed lock requests or not mentioning any of my entities
369  for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
370  {
371  // If a completed lock request, skip
372  if(!record->hash && (record->unique_id == 0u))
373  {
374  continue;
375  }
376  // If record hash doesn't match contents it's a torn read, reload
377  if(!_skip_hashing)
378  {
379  if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<const char *>(record)) + 16, sizeof(atomic_append_detail::lock_request) - 16))
380  {
381  goto reload;
382  }
383  }
384 
385  // Does this record lock anything I am locking?
386  for(const auto &entity : out.entities)
387  {
388  for(size_t n = 0; n < record->items; n++)
389  {
390  if(record->entities[n].value == entity.value)
391  {
392  // Is the lock I want exclusive or the lock he wants exclusive?
393  // If so, need to block
394  if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
395  {
396  goto beginwait;
397  }
398  }
399  }
400  }
401  }
402  // None of this batch of records has anything to do with my request, so keep going
403  continue;
404 
405  beginwait:
406  // Sleep until this record is freed using a shared lock
407  // on the record in our way. Note there is a race here
408  // between when the lock requester writes the lock
409  // request and when he takes an exclusive lock on it,
410  // so if our shared lock succeeds we need to immediately
411  // unlock and retry based on the data.
412  std::this_thread::yield();
413  if(!spin_not_sleep)
414  {
415  deadline nd;
416  if(d)
417  {
418  if((d).steady)
419  {
420  std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
421  if(ns.count() < 0)
422  {
423  (nd).nsecs = 0;
424  }
425  else
426  {
427  (nd).nsecs = ns.count();
428  }
429  }
430  else
431  {
432  (nd) = (d);
433  }
434  }
435  auto lock_offset = record_offset;
436  // Set the top bit to use the shadow lock space on Windows
437  lock_offset |= (1ULL << 63U);
438  OUTCOME_TRYV(_h.lock(lock_offset, sizeof(*record), false, nd));
439  }
440  // Make sure we haven't timed out during this wait
441  if(d)
442  {
443  if((d).steady)
444  {
445  if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
446  {
447  return errc::timed_out;
448  }
449  }
450  else
451  {
452  if(std::chrono::system_clock::now() >= end_utc)
453  {
454  return errc::timed_out;
455  }
456  }
457  }
458  } while(record_offset >= _header.first_known_good);
459  return success();
460  }
virtual io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle.
virtual result< void > set_append_only(bool enable) noexcept
virtual io_result< buffers_type > read(io_request< buffers_type > reqs, deadline d=deadline()) noexcept
Read data from the open handle.
virtual result< extent_type > maximum_extent() const noexcept
virtual result< extent_guard > lock(extent_type offset, extent_type bytes, bool exclusive=true, deadline d=deadline()) noexcept
Tries to lock the range of bytes specified for shared or exclusive access. Be aware this passes throu...

◆ fs_mutex_append()

static result<atomic_append> afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::fs_mutex_append ( const path_handle base,
path_view  lockfile,
bool  nfs_compatibility = false,
bool  skip_hashing = false 
)
inlinestaticnoexcept

Initialises a shared filing system mutex using the file at lockfile

Returns
An implementation of shared_fs_mutex using the atomic_append algorithm.
Parameters
baseOptional base for the path to the file.
lockfileThe path to the file to use for IPC.
nfs_compatibilityMake this true if the lockfile could be accessed by NFS.
skip_hashingSome filing systems (typically the copy on write ones e.g. ZFS, btrfs) guarantee atomicity of updates and therefore torn writes are never observed by readers. For these, hashing can be safely disabled.
Todo:
fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
196  {
197  AFIO_LOG_FUNCTION_CALL(0);
198  OUTCOME_TRY(ret, file_handle::file(base, lockfile, file_handle::mode::write, file_handle::creation::if_needed, file_handle::caching::temporary));
199  atomic_append_detail::header header;
200  // Lock the entire header for exclusive access
201  auto lockresult = ret.try_lock(0, sizeof(header), true);
202  //! \todo fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
203  if(lockresult.has_error())
204  {
205  if(lockresult.error() != errc::timed_out)
206  {
207  return lockresult.error();
208  }
209  // Somebody else is also using this file
210  }
211  else
212  {
213  // I am the first person to be using this (stale?) file, so write a new header and truncate
214  OUTCOME_TRYV(ret.truncate(sizeof(header)));
215  memset(&header, 0, sizeof(header));
216  header.time_offset = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
217  header.first_known_good = sizeof(header);
218  header.first_after_hole_punch = sizeof(header);
219  if(!skip_hashing)
220  {
221  header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&header)) + 16, sizeof(header) - 16);
222  }
223  OUTCOME_TRYV(ret.write(0, {{reinterpret_cast<byte *>(&header), sizeof(header)}}));
224  }
225  // Open a shared lock on last byte in header to prevent other users zomping the file
226  OUTCOME_TRY(guard, ret.lock(sizeof(header) - 1, 1, false));
227  // Unlock any exclusive lock I gained earlier now
228  if(lockresult)
229  {
230  lockresult.value().unlock();
231  }
232  // The constructor will read and cache the header
233  return atomic_append(std::move(ret), std::move(guard), nfs_compatibility, skip_hashing);
234  }
static result< file_handle > file(const path_handle &base, path_view_type path, mode _mode=mode::read, creation _creation=creation::open_existing, caching _caching=caching::all, flag flags=flag::none) noexcept
Ability to read and write (READ_CONTROL|FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA|FILE_WRITE_D...
Cache reads and writes of data and metadata so they complete immediately, only sending any updates to...

The documentation for this class was generated from the following file: