split module
This commit is contained in:
parent
3f402a20a8
commit
9a592f946f
4 changed files with 378 additions and 336 deletions
85
P2P/Http/State.hs
Normal file
85
P2P/Http/State.hs
Normal file
|
@ -0,0 +1,85 @@
|
|||
{- P2P protocol over HTTP, server state
|
||||
-
|
||||
- https://git-annex.branchable.com/design/p2p_protocol_over_http/
|
||||
-
|
||||
- Copyright 2024 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module P2P.Http.State where
|
||||
|
||||
import Annex.Common
|
||||
import P2P.Http.Types
|
||||
import Annex.UUID (genUUID)
|
||||
|
||||
import qualified Data.Map as M
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
|
||||
data P2PHttpServerState = P2PHttpServerState
|
||||
{ openLocks :: TMVar (M.Map LockID Locker)
|
||||
}
|
||||
|
||||
mkP2PHttpServerState :: IO P2PHttpServerState
|
||||
mkP2PHttpServerState = P2PHttpServerState
|
||||
<$> newTMVarIO mempty
|
||||
|
||||
data Locker = Locker
|
||||
{ lockerThread :: Async ()
|
||||
, lockerVar :: TMVar Bool
|
||||
-- ^ Left empty until the thread has taken the lock
|
||||
-- (or failed to do so), then True while the lock is held,
|
||||
-- and setting to False causes the lock to be released.
|
||||
}
|
||||
|
||||
mkLocker :: IO () -> IO () -> IO (Maybe (Locker, LockID))
|
||||
mkLocker lock unlock = do
|
||||
lv <- newEmptyTMVarIO
|
||||
let setlocked = putTMVar lv
|
||||
tid <- async $
|
||||
tryNonAsync lock >>= \case
|
||||
Left _ -> do
|
||||
atomically $ setlocked False
|
||||
unlock
|
||||
Right () -> do
|
||||
atomically $ setlocked True
|
||||
atomically $ do
|
||||
v <- takeTMVar lv
|
||||
if v
|
||||
then retry
|
||||
else setlocked False
|
||||
unlock
|
||||
locksuccess <- atomically $ readTMVar lv
|
||||
if locksuccess
|
||||
then do
|
||||
lckid <- B64UUID <$> genUUID
|
||||
return (Just (Locker tid lv, lckid))
|
||||
else do
|
||||
wait tid
|
||||
return Nothing
|
||||
|
||||
storeLock :: LockID -> Locker -> P2PHttpServerState -> IO ()
|
||||
storeLock lckid locker st = atomically $ do
|
||||
m <- takeTMVar (openLocks st)
|
||||
let !m' = M.insert lckid locker m
|
||||
putTMVar (openLocks st) m'
|
||||
|
||||
dropLock :: LockID -> P2PHttpServerState -> IO ()
|
||||
dropLock lckid st = do
|
||||
v <- atomically $ do
|
||||
m <- takeTMVar (openLocks st)
|
||||
let (mlocker, !m') =
|
||||
M.updateLookupWithKey (\_ _ -> Nothing) lckid m
|
||||
putTMVar (openLocks st) m'
|
||||
case mlocker of
|
||||
Nothing -> return Nothing
|
||||
-- Signal to the locker's thread that it can release the lock.
|
||||
Just locker -> do
|
||||
_ <- swapTMVar (lockerVar locker) False
|
||||
return (Just locker)
|
||||
case v of
|
||||
Nothing -> return ()
|
||||
Just locker -> wait (lockerThread locker)
|
Loading…
Add table
Add a link
Reference in a new issue