You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importredisimporttimeimportuuidfromcontextlibimportcontextmanager# Initialize Redis clientredis_client=redis.Redis(host='localhost', port=6379, db=0)
defacquire_lock(lock_name, lock_timeout=5, acquire_timeout=10):
"""Attempt to acquire a Redis lock."""identifier=str(uuid.uuid4()) # A unique identifier for this lock ownerlock_name=f"lock:{lock_name}"# Prefix the lock nameend=time.time() +acquire_timeoutwhiletime.time() <end:
# Try to acquire the lock using NX and EX to set if not exists, with a timeoutifredis_client.set(lock_name, identifier, ex=lock_timeout, nx=True):
returnidentifiertime.sleep(0.01) # Short sleep to avoid spamming the Redis serverraiseTimeoutError(f"Could not acquire lock for {lock_name}")
defrelease_lock(lock_name, identifier):
"""Release a Redis lock if the lock is still held by the current identifier."""lock_name=f"lock:{lock_name}"pipe=redis_client.pipeline(True)
try:
pipe.watch(lock_name)
ifpipe.get(lock_name) ==identifier.encode():
pipe.multi()
pipe.delete(lock_name)
pipe.execute()
returnTruepipe.unwatch()
exceptredis.exceptions.WatchError:
passreturnFalse@contextmanagerdefredis_lock(lock_name, lock_timeout=5, acquire_timeout=10):
"""Context manager to handle acquiring and releasing Redis locks."""identifier=acquire_lock(lock_name, lock_timeout, acquire_timeout)
try:
yieldfinally:
release_lock(lock_name, identifier)
# Critical section: payment processing functiondefprocess_payment(order_id, thread_name):
try:
withredis_lock(f"order:{order_id}:lock", lock_timeout=5, acquire_timeout=10):
print(f"[{thread_name}] Processing payment for order {order_id}")
time.sleep(3) # Simulate payment processing timeprint(f"[{thread_name}] Payment processed for order {order_id}")
exceptTimeoutErrorase:
print(f"[{thread_name}] {str(e)}")
# Simulate multiple threads trying to process the same orderif__name__=="__main__":
importthreadingorder_id=123# Create multiple threads trying to process the same order concurrentlythreads= []
foriinrange(5):
thread_name=f"Thread-{i+1}"thread=threading.Thread(target=process_payment, args=(order_id, thread_name))
threads.append(thread)
# Start all threadsforthreadinthreads:
thread.start()
# Wait for all threads to finishforthreadinthreads:
thread.join()
The text was updated successfully, but these errors were encountered:
rednafi
changed the title
Distributed locking from first principles
Distributed locking
Oct 28, 2024
rednafi
changed the title
Distributed locking
Writing a simple distributed lock from scratch
Oct 28, 2024
The text was updated successfully, but these errors were encountered: