AFIO  v2.00 late alpha
afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append Class Reference

Scalable many entity shared/exclusive file system based lock. More...

#include "atomic_append.hpp"

Inheritance diagram for afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append:
afio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex

Public Types

using entity_type = shared_fs_mutex::entity_type
 The type of an entity id.
 
using entities_type = shared_fs_mutex::entities_type
 The type of a sequence of entities.
 

Public Member Functions

 atomic_append (atomic_append &&o) noexcept
 Move constructor.
 
atomic_appendoperator= (atomic_append &&o) noexcept
 Move assign.
 
const file_handlehandle () const noexcept
 Return the handle to file being used for this lock.
 
virtual void unlock (entities_type entities, unsigned long long hint) noexcept final
 Unlock a previously locked sequence of entities.
 
entity_type entity_from_buffer (const char *buffer, size_t bytes, bool exclusive=true) noexcept
 Generates an entity id from a sequence of bytes.
 
template<typename T >
entity_type entity_from_string (const std::basic_string< T > &str, bool exclusive=true) noexcept
 Generates an entity id from a string.
 
entity_type random_entity (bool exclusive=true) noexcept
 Generates a cryptographically random entity id.
 
void fill_random_entities (span< entity_type > seq, bool exclusive=true) noexcept
 Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.
 
result< entities_guardlock (entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardlock (entity_type entity, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock a single entity for exclusive or shared access.
 
result< entities_guardtry_lock (entities_type entities) noexcept
 Try to lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardtry_lock (entity_type entity) noexcept
 Try to lock a single entity for exclusive or shared access.
 

Static Public Member Functions

static result< atomic_appendfs_mutex_append (const path_handle &base, path_view lockfile, bool nfs_compatibility=false, bool skip_hashing=false) noexcept
 

Protected Member Functions

virtual result< void > _lock (entities_guard &out, deadline d, bool spin_not_sleep) noexcept final
 

Detailed Description

Scalable many entity shared/exclusive file system based lock.

Lock files and byte ranges scale poorly to the number of items being concurrently locked with typically an exponential drop off in performance as the number of items being concurrently locked rises. This file system algorithm solves this problem using IPC via a shared append-only lock file.

  • Compatible with networked file systems (NFS too if the special nfs_compatibility flag is true. Note turning this on is not free of cost if you don't need NFS compatibility).
  • Nearly constant time to number of entities being locked.
  • Nearly constant time to number of processes concurrently using the lock (i.e. number of waiters).
  • Can sleep until a lock becomes free in a power-efficient manner.
  • Sudden power loss during use is recovered from.

Caveats:

  • Much slower than byte_ranges for few waiters or small number of entities.
  • Sudden process exit with locks held will deadlock all other users.
  • Maximum of twelve entities may be locked concurrently.
  • Wasteful of disk space if used on a non-extents based filing system (e.g. FAT32, ext3). It is best used in /tmp if possible (file_handle::temp_file()). If you really must use a non-extents based filing system, destroy and recreate the object instance periodically to force resetting the lock file's length to zero.
  • Similarly older operating systems (e.g. Linux < 3.0) do not implement extent hole punching and therefore will also see excessive disk space consumption. Note at the time of writing OS X doesn't implement hole punching at all.
  • If your OS doesn't have sane byte range locks (OS X, BSD, older Linuxes) and multiple objects in your process use the same lock file, misoperation will occur. Use lock_files instead.
Todo:

Implement hole punching once I port that code from AFIO v1.

Decide on some resolution mechanism for sudden process exit.

There is a 1 out of 2^64-2 chance of unique id collision. It would be nice if we actually formally checked that our chosen unique id is actually unique.

Member Function Documentation

◆ _lock()

virtual result<void> afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::_lock ( entities_guard out,
deadline  d,
bool  spin_not_sleep 
)
inlinefinalprotectedvirtualnoexcept
Todo:
Read from header.last_known_good immediately if possible in order to avoid a duplicate read later

Implements afio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

233  {
234  AFIO_LOG_FUNCTION_CALL(this);
235  atomic_append_detail::lock_request lock_request;
236  if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
237  {
238  return std::errc::argument_list_too_long;
239  }
240 
241  std::chrono::steady_clock::time_point began_steady;
242  std::chrono::system_clock::time_point end_utc;
243  if(d)
244  {
245  if((d).steady)
246  {
247  began_steady = std::chrono::steady_clock::now();
248  }
249  else
250  {
251  end_utc = (d).to_time_point();
252  }
253  }
254  // Fire this if an error occurs
255  auto disableunlock = undoer([&] { out.release(); });
256 
257  // Write my lock request immediately
258  memset(&lock_request, 0, sizeof(lock_request));
259  lock_request.unique_id = _unique_id;
260  auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
261  lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
262  lock_request.items = out.entities.size();
263  memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
264  if(!_skip_hashing)
265  {
266  lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
267  }
268  // My lock request will be the file's current length or higher
269  OUTCOME_TRY(my_lock_request_offset, _h.length());
270  {
271  OUTCOME_TRYV(_h.set_append_only(true));
272  auto undo = undoer([this] { (void) _h.set_append_only(false); });
273  file_handle::extent_guard append_guard;
274  if(_nfs_compatibility)
275  {
276  auto lastbyte = static_cast<file_handle::extent_type>(-1);
277  // Lock up to the beginning of the shadow lock space
278  lastbyte &= ~(1ULL << 63);
279  OUTCOME_TRY(append_guard_, _h.lock(my_lock_request_offset, lastbyte, true));
280  append_guard = std::move(append_guard_);
281  }
282  OUTCOME_TRYV(_h.write(0, (char *) &lock_request, sizeof(lock_request)));
283  }
284 
285  // Find the record I just wrote
286  alignas(64) char _buffer[4096 + 2048]; // 6Kb cache line aligned buffer
287  // Read onwards from length as reported before I wrote my lock request
288  // until I find my lock request. This loop should never actually iterate
289  // except under extreme load conditions.
290  //! \todo Read from header.last_known_good immediately if possible in order
291  //! to avoid a duplicate read later
292  for(;;)
293  {
294  file_handle::io_result<file_handle::buffer_type> readoutcome = _h.read(my_lock_request_offset, _buffer, sizeof(_buffer));
295  // Should never happen :)
296  if(readoutcome.has_error())
297  {
298  AFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
299  std::terminate();
300  }
301  const atomic_append_detail::lock_request *record, *lastrecord;
302  for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value().data), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value().data + readoutcome.value().len); record < lastrecord && record->hash != lock_request.hash; ++record)
303  {
304  my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
305  }
306  if(record->hash == lock_request.hash)
307  {
308  break;
309  }
310  }
311 
312  // extent_guard is now valid and will be unlocked on error
313  out.hint = my_lock_request_offset;
314  disableunlock.dismiss();
315 
316  // Lock my request for writing so others can sleep on me
317  file_handle::extent_guard my_request_guard;
318  if(!spin_not_sleep)
319  {
320  auto lock_offset = my_lock_request_offset;
321  // Set the top bit to use the shadow lock space on Windows
322  lock_offset |= (1ULL << 63);
323  OUTCOME_TRY(my_request_guard_, _h.lock(lock_offset, sizeof(lock_request), true));
324  my_request_guard = std::move(my_request_guard_);
325  }
326 
327  // Read every record preceding mine until header.first_known_good inclusive
328  auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
329  do
330  {
331  reload:
332  // Refresh the header and load a snapshot of everything between record_offset
333  // and first_known_good or -6Kb, whichever the sooner
334  OUTCOME_TRYV(_read_header());
335  // If there are no preceding records, we're done
336  if(record_offset < _header.first_known_good)
337  {
338  break;
339  }
340  auto start_offset = record_offset;
341  if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
342  {
343  start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
344  }
345  else
346  {
347  start_offset = sizeof(atomic_append_detail::lock_request);
348  }
349  if(start_offset < _header.first_known_good)
350  {
351  start_offset = _header.first_known_good;
352  }
353  assert(record_offset >= start_offset);
354  assert(record_offset - start_offset <= sizeof(_buffer));
355  OUTCOME_TRY(batchread, _h.read(start_offset, _buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)));
356  assert(batchread.len == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
357  const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread.data + batchread.len - sizeof(atomic_append_detail::lock_request));
358  const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread.data);
359 
360  // Skip all completed lock requests or not mentioning any of my entities
361  for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
362  {
363  // If a completed lock request, skip
364  if(!record->hash && (record->unique_id == 0u))
365  {
366  continue;
367  }
368  // If record hash doesn't match contents it's a torn read, reload
369  if(!_skip_hashing)
370  {
371  if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(((char *) record) + 16, sizeof(atomic_append_detail::lock_request) - 16))
372  {
373  goto reload;
374  }
375  }
376 
377  // Does this record lock anything I am locking?
378  for(const auto &entity : out.entities)
379  {
380  for(size_t n = 0; n < record->items; n++)
381  {
382  if(record->entities[n].value == entity.value)
383  {
384  // Is the lock I want exclusive or the lock he wants exclusive?
385  // If so, need to block
386  if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
387  {
388  goto beginwait;
389  }
390  }
391  }
392  }
393  }
394  // None of this batch of records has anything to do with my request, so keep going
395  continue;
396 
397  beginwait:
398  // Sleep until this record is freed using a shared lock
399  // on the record in our way. Note there is a race here
400  // between when the lock requester writes the lock
401  // request and when he takes an exclusive lock on it,
402  // so if our shared lock succeeds we need to immediately
403  // unlock and retry based on the data.
404  std::this_thread::yield();
405  if(!spin_not_sleep)
406  {
407  deadline nd;
408  if(d)
409  {
410  if((d).steady)
411  {
412  std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
413  if(ns.count() < 0)
414  {
415  (nd).nsecs = 0;
416  }
417  else
418  {
419  (nd).nsecs = ns.count();
420  }
421  }
422  else
423  {
424  (nd) = (d);
425  }
426  }
427  auto lock_offset = record_offset;
428  // Set the top bit to use the shadow lock space on Windows
429  lock_offset |= (1ULL << 63);
430  OUTCOME_TRYV(_h.lock(lock_offset, sizeof(record), false, nd));
431  }
432  // Make sure we haven't timed out during this wait
433  if(d)
434  {
435  if((d).steady)
436  {
437  if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
438  {
439  return std::errc::timed_out;
440  }
441  }
442  else
443  {
444  if(std::chrono::system_clock::now() >= end_utc)
445  {
446  return std::errc::timed_out;
447  }
448  }
449  }
450  } while(record_offset >= _header.first_known_good);
451  return success();
452  }
virtual io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle.
virtual result< void > set_append_only(bool enable) noexcept
virtual result< extent_type > length() const noexcept
virtual io_result< buffers_type > read(io_request< buffers_type > reqs, deadline d=deadline()) noexcept
Read data from the open handle.
virtual result< extent_guard > lock(extent_type offset, extent_type bytes, bool exclusive=true, deadline d=deadline()) noexcept
Tries to lock the range of bytes specified for shared or exclusive access. Be aware this passes throu...

◆ fs_mutex_append()

static result<atomic_append> afio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::fs_mutex_append ( const path_handle base,
path_view  lockfile,
bool  nfs_compatibility = false,
bool  skip_hashing = false 
)
inlinestaticnoexcept

Initialises a shared filing system mutex using the file at lockfile

Returns
An implementation of shared_fs_mutex using the atomic_append algorithm.
Parameters
baseOptional base for the path to the file.
lockfileThe path to the file to use for IPC.
nfs_compatibilityMake this true if the lockfile could be accessed by NFS.
skip_hashingSome filing systems (typically the copy on write ones e.g. ZFS, btrfs) guarantee atomicity of updates and therefore torn writes are never observed by readers. For these, hashing can be safely disabled.
Todo:
fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
188  {
189  AFIO_LOG_FUNCTION_CALL(0);
190  OUTCOME_TRY(ret, file_handle::file(base, lockfile, file_handle::mode::write, file_handle::creation::if_needed, file_handle::caching::temporary));
191  atomic_append_detail::header header;
192  // Lock the entire header for exclusive access
193  auto lockresult = ret.try_lock(0, sizeof(header), true);
194  //! \todo fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
195  if(lockresult.has_error())
196  {
197  if(lockresult.error() != std::errc::timed_out)
198  {
199  return lockresult.error();
200  }
201  // Somebody else is also using this file
202  }
203  else
204  {
205  // I am the first person to be using this (stale?) file, so write a new header and truncate
206  OUTCOME_TRYV(ret.truncate(sizeof(header)));
207  memset(&header, 0, sizeof(header));
208  header.time_offset = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
209  header.first_known_good = sizeof(header);
210  header.first_after_hole_punch = sizeof(header);
211  if(!skip_hashing)
212  {
213  header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&header)) + 16, sizeof(header) - 16);
214  }
215  OUTCOME_TRYV(ret.write(0, (char *) &header, sizeof(header)));
216  }
217  // Open a shared lock on last byte in header to prevent other users zomping the file
218  OUTCOME_TRY(guard, ret.lock(sizeof(header) - 1, 1, false));
219  // Unlock any exclusive lock I gained earlier now
220  if(lockresult)
221  {
222  lockresult.value().unlock();
223  }
224  // The constructor will read and cache the header
225  return atomic_append(std::move(ret), std::move(guard), nfs_compatibility, skip_hashing);
226  }
static result< file_handle > file(const path_handle &base, path_view_type path, mode _mode=mode::read, creation _creation=creation::open_existing, caching _caching=caching::all, flag flags=flag::none) noexcept
Ability to read and write (READ_CONTROL|FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA|FILE_WRITE_D...
Cache reads and writes of data and metadata so they complete immediately, only sending any updates to...

The documentation for this class was generated from the following file: