253 {
254 LLFIO_LOG_FUNCTION_CALL(this);
255 atomic_append_detail::lock_request lock_request;
256 if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
257 {
258 return errc::argument_list_too_long;
259 }
260
261 std::chrono::steady_clock::time_point began_steady;
262 std::chrono::system_clock::time_point end_utc;
263 if(d)
264 {
265 if((d).steady)
266 {
267 began_steady = std::chrono::steady_clock::now();
268 }
269 else
270 {
271 end_utc = (d).to_time_point();
272 }
273 }
274
275 auto disableunlock = make_scope_exit([&]() noexcept { out.release(); });
276
277
278 memset(&lock_request, 0, sizeof(lock_request));
279 lock_request.unique_id = _unique_id;
280 auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
281 lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
282 lock_request.items = out.entities.size();
283 memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
284 if(!_skip_hashing)
285 {
286 lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
287 }
288
290 {
292 auto undo = make_scope_exit([
this]()
noexcept { (void) _h.
set_append_only(
false); });
293 file_handle::extent_guard append_guard;
294 if(_nfs_compatibility)
295 {
296 auto lastbyte = static_cast<file_handle::extent_type>(-1);
297
298 lastbyte &= ~(1ULL << 63U);
300 append_guard = std::move(append_guard_);
301 }
302 OUTCOME_TRYV(_h.
write(0, {{reinterpret_cast<byte *>(&lock_request), sizeof(lock_request)}}));
303 }
304
305
306 alignas(64) byte _buffer[4096 + 2048];
307
308
309
310
311
312 for(;;)
313 {
314 file_handle::buffer_type req{_buffer, sizeof(_buffer)};
315 file_handle::io_result<file_handle::buffers_type> readoutcome = _h.
read({{&req, 1}, my_lock_request_offset});
316
317 if(readoutcome.has_error())
318 {
319 LLFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
320 std::terminate();
321 }
322 const atomic_append_detail::lock_request *record, *lastrecord;
323 for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data()), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data() + readoutcome.value()[0].size()); record < lastrecord && record->hash != lock_request.hash;
324 ++record)
325 {
326 my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
327 }
328 if(record->hash == lock_request.hash)
329 {
330 break;
331 }
332 }
333
334
335 out.hint = my_lock_request_offset;
336 disableunlock.release();
337
338
339 file_handle::extent_guard my_request_guard;
340 if(!spin_not_sleep)
341 {
342 auto lock_offset = my_lock_request_offset;
343
344 lock_offset |= (1ULL << 63U);
346 my_request_guard = std::move(my_request_guard_);
347 }
348
349
350 auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
351 do
352 {
353 reload:
354
355
356 OUTCOME_TRYV(_read_header());
357
358 if(record_offset < _header.first_known_good)
359 {
360 break;
361 }
362 auto start_offset = record_offset;
363 if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
364 {
365 start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
366 }
367 else
368 {
369 start_offset = sizeof(atomic_append_detail::lock_request);
370 }
371 if(start_offset < _header.first_known_good)
372 {
373 start_offset = _header.first_known_good;
374 }
375 assert(record_offset >= start_offset);
376 assert(record_offset - start_offset <= sizeof(_buffer));
377 file_handle::buffer_type req{_buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)};
378 OUTCOME_TRY(
auto &&batchread, _h.
read({{&req, 1}, start_offset}));
379 assert(batchread[0].size() == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
380 const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data() + batchread[0].size() - sizeof(atomic_append_detail::lock_request));
381 const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data());
382
383
384 for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
385 {
386
387 if(!record->hash && (record->unique_id == 0u))
388 {
389 continue;
390 }
391
392 if(!_skip_hashing)
393 {
394 if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<const char *>(record)) + 16, sizeof(atomic_append_detail::lock_request) - 16))
395 {
396 goto reload;
397 }
398 }
399
400
401 for(const auto &entity : out.entities)
402 {
403 for(size_t n = 0; n < record->items; n++)
404 {
405 if(record->entities[n].value == entity.value)
406 {
407
408
409 if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
410 {
411 goto beginwait;
412 }
413 }
414 }
415 }
416 }
417
418 continue;
419
420 beginwait:
421
422
423
424
425
426
427 std::this_thread::yield();
428 if(!spin_not_sleep)
429 {
430 deadline nd;
431 if(d)
432 {
433 if((d).steady)
434 {
435 std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
436 if(ns.count() < 0)
437 {
438 (nd).nsecs = 0;
439 }
440 else
441 {
442 (nd).nsecs = ns.count();
443 }
444 }
445 else
446 {
447 (nd) = (d);
448 }
449 }
450 auto lock_offset = record_offset;
451
452 lock_offset |= (1ULL << 63U);
454 }
455
456 if(d)
457 {
458 if((d).steady)
459 {
460 if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
461 {
462 return errc::timed_out;
463 }
464 }
465 else
466 {
467 if(std::chrono::system_clock::now() >= end_utc)
468 {
469 return errc::timed_out;
470 }
471 }
472 }
473 } while(record_offset >= _header.first_known_good);
474 return success();
475 }
io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle, preferentially using any i/o multiplexer set over the virtually overri...
Definition byte_io_handle.hpp:345
io_result< buffers_type > read(io_request< buffers_type > reqs, deadline d=deadline()) noexcept
Read data from the open handle, preferentially using any i/o multiplexer set over the virtually overr...
Definition byte_io_handle.hpp:297
virtual result< extent_type > maximum_extent() const noexcept
virtual result< void > set_append_only(bool enable) noexcept
EXTENSION: Changes whether this handle is append only or not.
virtual result< extent_guard > lock_file_range(extent_type offset, extent_type bytes, lock_kind kind, deadline d=deadline()) noexcept
EXTENSION: Tries to lock the range of bytes specified for shared or exclusive access....
@ shared
Exclude only those requesting an exclusive lock on the same inode.
@ exclusive
Exclude those requesting any kind of lock on the same inode.