move DaemonStatus manipulation out of the Annex monad to IO
I've convinced myself that nothing in DaemonStatus can deadlock, as it always keepts the TMVar full. That was the only reason it was in the Annex monad.
This commit is contained in:
parent
a17fde22fa
commit
3cc1885793
9 changed files with 49 additions and 60 deletions
|
@ -65,10 +65,8 @@
|
||||||
- Annex monad in IO actions run by the watcher and committer
|
- Annex monad in IO actions run by the watcher and committer
|
||||||
- threads. Thus, a single state is shared amoung the threads, and
|
- threads. Thus, a single state is shared amoung the threads, and
|
||||||
- only one at a time can access it.
|
- only one at a time can access it.
|
||||||
- DaemonStatusHandle: (MVar)
|
- DaemonStatusHandle: (STM TMVar)
|
||||||
- The daemon's current status. This MVar should only be manipulated
|
- The daemon's current status.
|
||||||
- from inside the Annex monad, which ensures it's accessed only
|
|
||||||
- after the ThreadState MVar.
|
|
||||||
- ChangeChan: (STM TChan)
|
- ChangeChan: (STM TChan)
|
||||||
- Changes are indicated by writing to this channel. The committer
|
- Changes are indicated by writing to this channel. The committer
|
||||||
- reads from it.
|
- reads from it.
|
||||||
|
|
|
@ -15,7 +15,7 @@ import Utility.NotificationBroadcaster
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import qualified Command.Sync
|
import qualified Command.Sync
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent.STM
|
||||||
import System.Posix.Types
|
import System.Posix.Types
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Data.Time
|
import Data.Time
|
||||||
|
@ -41,7 +41,8 @@ data DaemonStatus = DaemonStatus
|
||||||
|
|
||||||
type TransferMap = M.Map Transfer TransferInfo
|
type TransferMap = M.Map Transfer TransferInfo
|
||||||
|
|
||||||
type DaemonStatusHandle = MVar DaemonStatus
|
{- This TMVar is never left empty, so accessing it will never block. -}
|
||||||
|
type DaemonStatusHandle = TMVar DaemonStatus
|
||||||
|
|
||||||
newDaemonStatus :: IO DaemonStatus
|
newDaemonStatus :: IO DaemonStatus
|
||||||
newDaemonStatus = do
|
newDaemonStatus = do
|
||||||
|
@ -56,21 +57,19 @@ newDaemonStatus = do
|
||||||
, notificationBroadcaster = nb
|
, notificationBroadcaster = nb
|
||||||
}
|
}
|
||||||
|
|
||||||
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
|
getDaemonStatus :: DaemonStatusHandle -> IO DaemonStatus
|
||||||
getDaemonStatus = liftIO . readMVar
|
getDaemonStatus = atomically . readTMVar
|
||||||
|
|
||||||
modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO ()
|
||||||
modifyDaemonStatus_ handle a = do
|
modifyDaemonStatus_ handle a = modifyDaemonStatus handle $ \s -> (a s, ())
|
||||||
nb <- liftIO $ modifyMVar handle $ \s -> return
|
|
||||||
(a s, notificationBroadcaster s)
|
|
||||||
liftIO $ sendNotification nb
|
|
||||||
|
|
||||||
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b
|
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b
|
||||||
modifyDaemonStatus handle a = do
|
modifyDaemonStatus handle a = do
|
||||||
(b, nb) <- liftIO $ modifyMVar handle $ \s -> do
|
(b, nb) <- atomically $ do
|
||||||
let (s', b) = a s
|
(s, b) <- a <$> takeTMVar handle
|
||||||
return $ (s', (b, notificationBroadcaster s))
|
putTMVar handle s
|
||||||
liftIO $ sendNotification nb
|
return $ (b, notificationBroadcaster s)
|
||||||
|
sendNotification nb
|
||||||
return b
|
return b
|
||||||
|
|
||||||
{- Updates the cached ordered list of remotes from the list in Annex
|
{- Updates the cached ordered list of remotes from the list in Annex
|
||||||
|
@ -78,10 +77,10 @@ modifyDaemonStatus handle a = do
|
||||||
updateKnownRemotes :: DaemonStatusHandle -> Annex ()
|
updateKnownRemotes :: DaemonStatusHandle -> Annex ()
|
||||||
updateKnownRemotes dstatus = do
|
updateKnownRemotes dstatus = do
|
||||||
remotes <- Command.Sync.syncRemotes []
|
remotes <- Command.Sync.syncRemotes []
|
||||||
modifyDaemonStatus_ dstatus $
|
liftIO $ modifyDaemonStatus_ dstatus $
|
||||||
\s -> s { knownRemotes = remotes }
|
\s -> s { knownRemotes = remotes }
|
||||||
|
|
||||||
{- Load any previous daemon status file, and store it in the MVar for this
|
{- Load any previous daemon status file, and store it in a MVar for this
|
||||||
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
||||||
startDaemonStatus :: Annex DaemonStatusHandle
|
startDaemonStatus :: Annex DaemonStatusHandle
|
||||||
startDaemonStatus = do
|
startDaemonStatus = do
|
||||||
|
@ -90,7 +89,7 @@ startDaemonStatus = do
|
||||||
catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
|
catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
|
||||||
transfers <- M.fromList <$> getTransfers
|
transfers <- M.fromList <$> getTransfers
|
||||||
remotes <- Command.Sync.syncRemotes []
|
remotes <- Command.Sync.syncRemotes []
|
||||||
liftIO $ newMVar status
|
liftIO $ atomically $ newTMVar status
|
||||||
{ scanComplete = False
|
{ scanComplete = False
|
||||||
, sanityCheckRunning = False
|
, sanityCheckRunning = False
|
||||||
, currentTransfers = transfers
|
, currentTransfers = transfers
|
||||||
|
@ -102,18 +101,17 @@ startDaemonStatus = do
|
||||||
-}
|
-}
|
||||||
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||||
daemonStatusThread st handle = do
|
daemonStatusThread st handle = do
|
||||||
bhandle <- runThreadState st $
|
bhandle <- newNotificationHandle
|
||||||
liftIO . newNotificationHandle
|
=<< notificationBroadcaster <$> getDaemonStatus handle
|
||||||
=<< notificationBroadcaster <$> getDaemonStatus handle
|
|
||||||
checkpoint
|
checkpoint
|
||||||
runEvery (Seconds tenMinutes) $ do
|
runEvery (Seconds tenMinutes) $ do
|
||||||
liftIO $ waitNotification bhandle
|
waitNotification bhandle
|
||||||
checkpoint
|
checkpoint
|
||||||
where
|
where
|
||||||
checkpoint = runThreadState st $ do
|
checkpoint = do
|
||||||
file <- fromRepo gitAnnexDaemonStatusFile
|
|
||||||
status <- getDaemonStatus handle
|
status <- getDaemonStatus handle
|
||||||
liftIO $ writeDaemonStatusFile file status
|
file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
|
||||||
|
writeDaemonStatusFile file status
|
||||||
|
|
||||||
{- Don't just dump out the structure, because it will change over time,
|
{- Don't just dump out the structure, because it will change over time,
|
||||||
- and parts of it are not relevant. -}
|
- and parts of it are not relevant. -}
|
||||||
|
@ -167,12 +165,12 @@ tenMinutes :: Int
|
||||||
tenMinutes = 10 * 60
|
tenMinutes = 10 * 60
|
||||||
|
|
||||||
{- Mutates the transfer map. -}
|
{- Mutates the transfer map. -}
|
||||||
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
|
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO ()
|
||||||
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
|
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
|
||||||
\s -> s { currentTransfers = a (currentTransfers s) }
|
\s -> s { currentTransfers = a (currentTransfers s) }
|
||||||
|
|
||||||
{- Removes a transfer from the map, and returns its info. -}
|
{- Removes a transfer from the map, and returns its info. -}
|
||||||
removeTransfer :: DaemonStatusHandle -> Transfer -> Annex (Maybe TransferInfo)
|
removeTransfer :: DaemonStatusHandle -> Transfer -> IO (Maybe TransferInfo)
|
||||||
removeTransfer dstatus t = modifyDaemonStatus dstatus go
|
removeTransfer dstatus t = modifyDaemonStatus dstatus go
|
||||||
where
|
where
|
||||||
go s =
|
go s =
|
||||||
|
|
|
@ -51,8 +51,7 @@ pushThread st daemonstatus commitchan pushmap = do
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
if shouldPush now commits
|
if shouldPush now commits
|
||||||
then do
|
then do
|
||||||
remotes <- runThreadState st $
|
remotes <- knownRemotes <$> getDaemonStatus daemonstatus
|
||||||
knownRemotes <$> getDaemonStatus daemonstatus
|
|
||||||
pushToRemotes thisThread now st (Just pushmap) remotes
|
pushToRemotes thisThread now st (Just pushmap) remotes
|
||||||
else do
|
else do
|
||||||
debug thisThread
|
debug thisThread
|
||||||
|
|
|
@ -26,32 +26,28 @@ thisThread = "SanityChecker"
|
||||||
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
|
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
|
||||||
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
|
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
|
||||||
sanityCheckerThread st status transferqueue changechan = forever $ do
|
sanityCheckerThread st status transferqueue changechan = forever $ do
|
||||||
waitForNextCheck st status
|
waitForNextCheck status
|
||||||
|
|
||||||
debug thisThread ["starting sanity check"]
|
debug thisThread ["starting sanity check"]
|
||||||
|
|
||||||
runThreadState st $
|
modifyDaemonStatus_ status $ \s -> s
|
||||||
modifyDaemonStatus_ status $ \s -> s
|
{ sanityCheckRunning = True }
|
||||||
{ sanityCheckRunning = True }
|
|
||||||
|
|
||||||
now <- getPOSIXTime -- before check started
|
now <- getPOSIXTime -- before check started
|
||||||
catchIO (check st status transferqueue changechan)
|
catchIO (check st status transferqueue changechan)
|
||||||
(runThreadState st . warning . show)
|
(runThreadState st . warning . show)
|
||||||
|
|
||||||
runThreadState st $ do
|
modifyDaemonStatus_ status $ \s -> s
|
||||||
modifyDaemonStatus_ status $ \s -> s
|
{ sanityCheckRunning = False
|
||||||
{ sanityCheckRunning = False
|
, lastSanityCheck = Just now
|
||||||
, lastSanityCheck = Just now
|
}
|
||||||
}
|
|
||||||
|
|
||||||
debug thisThread ["sanity check complete"]
|
debug thisThread ["sanity check complete"]
|
||||||
|
|
||||||
|
|
||||||
{- Only run one check per day, from the time of the last check. -}
|
{- Only run one check per day, from the time of the last check. -}
|
||||||
waitForNextCheck :: ThreadState -> DaemonStatusHandle -> IO ()
|
waitForNextCheck :: DaemonStatusHandle -> IO ()
|
||||||
waitForNextCheck st status = do
|
waitForNextCheck status = do
|
||||||
v <- runThreadState st $
|
v <- lastSanityCheck <$> getDaemonStatus status
|
||||||
lastSanityCheck <$> getDaemonStatus status
|
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
threadDelaySeconds $ Seconds $ calcdelay now v
|
threadDelaySeconds $ Seconds $ calcdelay now v
|
||||||
where
|
where
|
||||||
|
|
|
@ -55,12 +55,11 @@ onErr _ _ msg _ = error msg
|
||||||
onAdd :: Handler
|
onAdd :: Handler
|
||||||
onAdd st dstatus file _ = case parseTransferFile file of
|
onAdd st dstatus file _ = case parseTransferFile file of
|
||||||
Nothing -> noop
|
Nothing -> noop
|
||||||
Just t -> do
|
Just t -> go t =<< runThreadState st (checkTransfer t)
|
||||||
runThreadState st $ go t =<< checkTransfer t
|
|
||||||
where
|
where
|
||||||
go _ Nothing = noop -- transfer already finished
|
go _ Nothing = noop -- transfer already finished
|
||||||
go t (Just info) = do
|
go t (Just info) = do
|
||||||
liftIO $ debug thisThread
|
debug thisThread
|
||||||
[ "transfer starting:"
|
[ "transfer starting:"
|
||||||
, show t
|
, show t
|
||||||
]
|
]
|
||||||
|
@ -71,11 +70,11 @@ onAdd st dstatus file _ = case parseTransferFile file of
|
||||||
|
|
||||||
{- Called when a transfer information file is removed. -}
|
{- Called when a transfer information file is removed. -}
|
||||||
onDel :: Handler
|
onDel :: Handler
|
||||||
onDel st dstatus file _ = case parseTransferFile file of
|
onDel _ dstatus file _ = case parseTransferFile file of
|
||||||
Nothing -> noop
|
Nothing -> noop
|
||||||
Just t -> do
|
Just t -> do
|
||||||
debug thisThread
|
debug thisThread
|
||||||
[ "transfer finishing:"
|
[ "transfer finishing:"
|
||||||
, show t
|
, show t
|
||||||
]
|
]
|
||||||
void $ runThreadState st $ removeTransfer dstatus t
|
void $ removeTransfer dstatus t
|
||||||
|
|
|
@ -48,7 +48,7 @@ transfererThread st dstatus transferqueue slots = go
|
||||||
- being uploaded to isn't known to have the file. -}
|
- being uploaded to isn't known to have the file. -}
|
||||||
shouldTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> Annex Bool
|
shouldTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> Annex Bool
|
||||||
shouldTransfer dstatus t info =
|
shouldTransfer dstatus t info =
|
||||||
go =<< currentTransfers <$> getDaemonStatus dstatus
|
go =<< currentTransfers <$> liftIO (getDaemonStatus dstatus)
|
||||||
where
|
where
|
||||||
go m
|
go m
|
||||||
| M.member t m = return False
|
| M.member t m = return False
|
||||||
|
@ -84,7 +84,7 @@ transferThread st dstatus slots t info = case (transferRemote info, associatedFi
|
||||||
tid <- inTransferSlot slots st $
|
tid <- inTransferSlot slots st $
|
||||||
transferprocess remote file
|
transferprocess remote file
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
runThreadState st $ adjustTransfers dstatus $
|
adjustTransfers dstatus $
|
||||||
M.insertWith' const t info
|
M.insertWith' const t info
|
||||||
{ startedTime = Just $ utcTimeToPOSIXSeconds now
|
{ startedTime = Just $ utcTimeToPOSIXSeconds now
|
||||||
, transferTid = Just tid
|
, transferTid = Just tid
|
||||||
|
|
|
@ -76,8 +76,7 @@ statupScan st dstatus scanner = do
|
||||||
runThreadState st $
|
runThreadState st $
|
||||||
showAction "scanning"
|
showAction "scanning"
|
||||||
r <- scanner
|
r <- scanner
|
||||||
runThreadState st $
|
modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
|
||||||
modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
|
|
||||||
|
|
||||||
-- Notice any files that were deleted before watching was started.
|
-- Notice any files that were deleted before watching was started.
|
||||||
runThreadState st $ do
|
runThreadState st $ do
|
||||||
|
@ -132,7 +131,7 @@ runHandler threadname st dstatus transferqueue changechan handler file filestatu
|
||||||
onAdd :: Handler
|
onAdd :: Handler
|
||||||
onAdd threadname file filestatus dstatus _
|
onAdd threadname file filestatus dstatus _
|
||||||
| maybe False isRegularFile filestatus = do
|
| maybe False isRegularFile filestatus = do
|
||||||
ifM (scanComplete <$> getDaemonStatus dstatus)
|
ifM (scanComplete <$> liftIO (getDaemonStatus dstatus))
|
||||||
( go
|
( go
|
||||||
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
|
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
|
||||||
( noChange
|
( noChange
|
||||||
|
@ -156,7 +155,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
|
||||||
link <- calcGitLink file key
|
link <- calcGitLink file key
|
||||||
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
||||||
( do
|
( do
|
||||||
s <- getDaemonStatus dstatus
|
s <- liftIO $ getDaemonStatus dstatus
|
||||||
checkcontent key s
|
checkcontent key s
|
||||||
ensurestaged link s
|
ensurestaged link s
|
||||||
, do
|
, do
|
||||||
|
@ -167,7 +166,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
|
||||||
)
|
)
|
||||||
go Nothing = do -- other symlink
|
go Nothing = do -- other symlink
|
||||||
link <- liftIO (readSymbolicLink file)
|
link <- liftIO (readSymbolicLink file)
|
||||||
ensurestaged link =<< getDaemonStatus dstatus
|
ensurestaged link =<< liftIO (getDaemonStatus dstatus)
|
||||||
|
|
||||||
{- This is often called on symlinks that are already
|
{- This is often called on symlinks that are already
|
||||||
- staged correctly. A symlink may have been deleted
|
- staged correctly. A symlink may have been deleted
|
||||||
|
|
|
@ -115,7 +115,7 @@ statusDisplay = do
|
||||||
|
|
||||||
current <- liftIO $ runThreadState (threadState webapp) $
|
current <- liftIO $ runThreadState (threadState webapp) $
|
||||||
M.toList . currentTransfers
|
M.toList . currentTransfers
|
||||||
<$> getDaemonStatus (daemonStatus webapp)
|
<$> liftIO (getDaemonStatus $ daemonStatus webapp)
|
||||||
queued <- liftIO $ getTransferQueue $ transferQueue webapp
|
queued <- liftIO $ getTransferQueue $ transferQueue webapp
|
||||||
let transfers = current ++ queued
|
let transfers = current ++ queued
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ stubInfo f r = TransferInfo
|
||||||
{- Adds transfers to queue for some of the known remotes. -}
|
{- Adds transfers to queue for some of the known remotes. -}
|
||||||
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
||||||
queueTransfers schedule q daemonstatus k f direction = do
|
queueTransfers schedule q daemonstatus k f direction = do
|
||||||
rs <- knownRemotes <$> getDaemonStatus daemonstatus
|
rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus)
|
||||||
mapM_ go =<< sufficientremotes rs
|
mapM_ go =<< sufficientremotes rs
|
||||||
where
|
where
|
||||||
sufficientremotes rs
|
sufficientremotes rs
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue