pushed Assistant monad down into DaemonStatus code
Currently have three old versions of functions that more reworking is needed to remove: getDaemonStatusOld, modifyDaemonStatusOld_, and modifyDaemonStatusOld
This commit is contained in:
parent
ea8df8fe9f
commit
47d94eb9a4
20 changed files with 141 additions and 152 deletions
|
@ -31,11 +31,16 @@ getDaemonStatusOld = atomically . readTMVar
|
|||
getDaemonStatus :: Assistant DaemonStatus
|
||||
getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle
|
||||
|
||||
modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO ()
|
||||
modifyDaemonStatus_ dstatus a = modifyDaemonStatus dstatus $ \s -> (a s, ())
|
||||
-- TODO remove this
|
||||
modifyDaemonStatusOld_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO ()
|
||||
modifyDaemonStatusOld_ dstatus a = modifyDaemonStatusOld dstatus $ \s -> (a s, ())
|
||||
|
||||
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b
|
||||
modifyDaemonStatus dstatus a = do
|
||||
modifyDaemonStatus_ :: (DaemonStatus -> DaemonStatus) -> Assistant ()
|
||||
modifyDaemonStatus_ a = modifyDaemonStatus $ \s -> (a s, ())
|
||||
|
||||
-- TODO remove this
|
||||
modifyDaemonStatusOld :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b
|
||||
modifyDaemonStatusOld dstatus a = do
|
||||
(s, b) <- atomically $ do
|
||||
r@(s, _) <- a <$> takeTMVar dstatus
|
||||
putTMVar dstatus s
|
||||
|
@ -43,6 +48,17 @@ modifyDaemonStatus dstatus a = do
|
|||
sendNotification $ changeNotifier s
|
||||
return b
|
||||
|
||||
modifyDaemonStatus :: (DaemonStatus -> (DaemonStatus, b)) -> Assistant b
|
||||
modifyDaemonStatus a = do
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ do
|
||||
(s, b) <- atomically $ do
|
||||
r@(s, _) <- a <$> takeTMVar dstatus
|
||||
putTMVar dstatus s
|
||||
return r
|
||||
sendNotification $ changeNotifier s
|
||||
return b
|
||||
|
||||
{- Syncable remotes ordered by cost. -}
|
||||
calcSyncRemotes :: Annex [Remote]
|
||||
calcSyncRemotes = do
|
||||
|
@ -53,11 +69,10 @@ calcSyncRemotes = do
|
|||
return $ filter good rs
|
||||
|
||||
{- Updates the sycRemotes list from the list of all remotes in Annex state. -}
|
||||
updateSyncRemotes :: DaemonStatusHandle -> Annex ()
|
||||
updateSyncRemotes dstatus = do
|
||||
remotes <- calcSyncRemotes
|
||||
liftIO $ modifyDaemonStatus_ dstatus $
|
||||
\s -> s { syncRemotes = remotes }
|
||||
updateSyncRemotes :: Assistant ()
|
||||
updateSyncRemotes = do
|
||||
remotes <- liftAnnex calcSyncRemotes
|
||||
modifyDaemonStatus_ $ \s -> s { syncRemotes = remotes }
|
||||
|
||||
{- Load any previous daemon status file, and store it in a MVar for this
|
||||
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
||||
|
@ -136,15 +151,14 @@ adjustTransfersSTM dstatus a = do
|
|||
putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
|
||||
|
||||
{- Alters a transfer's info, if the transfer is in the map. -}
|
||||
alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO ()
|
||||
alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t
|
||||
alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> Assistant ()
|
||||
alterTransferInfo t a = updateTransferInfo' $ M.adjust a t
|
||||
|
||||
{- Updates a transfer's info. Adds the transfer to the map if necessary,
|
||||
- or if already present, updates it while preserving the old transferTid,
|
||||
- transferPaused, and bytesComplete values, which are not written to disk. -}
|
||||
updateTransferInfo :: DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
|
||||
updateTransferInfo dstatus t info = updateTransferInfo' dstatus $
|
||||
M.insertWith' merge t info
|
||||
updateTransferInfo :: Transfer -> TransferInfo -> Assistant ()
|
||||
updateTransferInfo t info = updateTransferInfo' $ M.insertWith' merge t info
|
||||
where
|
||||
merge new old = new
|
||||
{ transferTid = maybe (transferTid new) Just (transferTid old)
|
||||
|
@ -152,52 +166,59 @@ updateTransferInfo dstatus t info = updateTransferInfo' dstatus $
|
|||
, bytesComplete = maybe (bytesComplete new) Just (bytesComplete old)
|
||||
}
|
||||
|
||||
updateTransferInfo' :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO ()
|
||||
updateTransferInfo' dstatus a =
|
||||
notifyTransfer dstatus `after` modifyDaemonStatus_ dstatus go
|
||||
updateTransferInfo' :: (TransferMap -> TransferMap) -> Assistant ()
|
||||
updateTransferInfo' a = notifyTransfer `after` modifyDaemonStatus_ update
|
||||
where
|
||||
go s = s { currentTransfers = a (currentTransfers s) }
|
||||
update s = s { currentTransfers = a (currentTransfers s) }
|
||||
|
||||
{- Removes a transfer from the map, and returns its info. -}
|
||||
removeTransfer :: DaemonStatusHandle -> Transfer -> IO (Maybe TransferInfo)
|
||||
removeTransfer dstatus t =
|
||||
notifyTransfer dstatus `after` modifyDaemonStatus dstatus go
|
||||
removeTransfer :: Transfer -> Assistant (Maybe TransferInfo)
|
||||
removeTransfer t = notifyTransfer `after` modifyDaemonStatus remove
|
||||
where
|
||||
go s =
|
||||
remove s =
|
||||
let (info, ts) = M.updateLookupWithKey
|
||||
(\_k _v -> Nothing)
|
||||
t (currentTransfers s)
|
||||
in (s { currentTransfers = ts }, info)
|
||||
|
||||
{- Send a notification when a transfer is changed. -}
|
||||
notifyTransfer :: DaemonStatusHandle -> IO ()
|
||||
notifyTransfer dstatus = sendNotification
|
||||
notifyTransfer :: Assistant ()
|
||||
notifyTransfer = do
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ sendNotification
|
||||
=<< transferNotifier <$> atomically (readTMVar dstatus)
|
||||
|
||||
-- TODO remove
|
||||
notifyTransferOld :: DaemonStatusHandle -> IO ()
|
||||
notifyTransferOld dstatus = sendNotification
|
||||
=<< transferNotifier <$> atomically (readTMVar dstatus)
|
||||
|
||||
{- Send a notification when alerts are changed. -}
|
||||
notifyAlert :: DaemonStatusHandle -> IO ()
|
||||
notifyAlert dstatus = sendNotification
|
||||
=<< alertNotifier <$> atomically (readTMVar dstatus)
|
||||
notifyAlert :: Assistant ()
|
||||
notifyAlert = do
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ sendNotification
|
||||
=<< alertNotifier <$> atomically (readTMVar dstatus)
|
||||
|
||||
{- Returns the alert's identifier, which can be used to remove it. -}
|
||||
addAlert :: DaemonStatusHandle -> Alert -> IO AlertId
|
||||
addAlert dstatus alert = notifyAlert dstatus `after` modifyDaemonStatus dstatus go
|
||||
addAlert :: Alert -> Assistant AlertId
|
||||
addAlert alert = notifyAlert `after` modifyDaemonStatus add
|
||||
where
|
||||
go s = (s { lastAlertId = i, alertMap = m }, i)
|
||||
add s = (s { lastAlertId = i, alertMap = m }, i)
|
||||
where
|
||||
i = nextAlertId $ lastAlertId s
|
||||
m = mergeAlert i alert (alertMap s)
|
||||
|
||||
removeAlert :: DaemonStatusHandle -> AlertId -> IO ()
|
||||
removeAlert dstatus i = updateAlert dstatus i (const Nothing)
|
||||
removeAlert :: AlertId -> Assistant ()
|
||||
removeAlert i = updateAlert i (const Nothing)
|
||||
|
||||
updateAlert :: DaemonStatusHandle -> AlertId -> (Alert -> Maybe Alert) -> IO ()
|
||||
updateAlert dstatus i a = updateAlertMap dstatus $ \m -> M.update a i m
|
||||
updateAlert :: AlertId -> (Alert -> Maybe Alert) -> Assistant ()
|
||||
updateAlert i a = updateAlertMap $ \m -> M.update a i m
|
||||
|
||||
updateAlertMap :: DaemonStatusHandle -> (AlertMap -> AlertMap) -> IO ()
|
||||
updateAlertMap dstatus a = notifyAlert dstatus `after` modifyDaemonStatus_ dstatus go
|
||||
updateAlertMap :: (AlertMap -> AlertMap) -> Assistant ()
|
||||
updateAlertMap a = notifyAlert `after` modifyDaemonStatus_ update
|
||||
where
|
||||
go s = s { alertMap = a (alertMap s) }
|
||||
update s = s { alertMap = a (alertMap s) }
|
||||
|
||||
{- Displays an alert while performing an activity that returns True on
|
||||
- success.
|
||||
|
@ -213,17 +234,13 @@ alertWhile alert a = alertWhile' alert $ do
|
|||
alertWhile' :: Alert -> Assistant (Bool, a) -> Assistant a
|
||||
alertWhile' alert a = do
|
||||
let alert' = alert { alertClass = Activity }
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
i <- liftIO $ addAlert dstatus alert'
|
||||
i <- addAlert alert'
|
||||
(ok, r) <- a
|
||||
liftIO $ updateAlertMap dstatus $
|
||||
mergeAlert i $ makeAlertFiller ok alert'
|
||||
updateAlertMap $ mergeAlert i $ makeAlertFiller ok alert'
|
||||
return r
|
||||
|
||||
{- Displays an alert while performing an activity, then removes it. -}
|
||||
alertDuring :: Alert -> Assistant a -> Assistant a
|
||||
alertDuring alert a = do
|
||||
let alert' = alert { alertClass = Activity }
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
i <- liftIO $ addAlert dstatus alert'
|
||||
liftIO (removeAlert dstatus i) `after` a
|
||||
i <- addAlert $ alert { alertClass = Activity }
|
||||
removeAlert i `after` a
|
||||
|
|
|
@ -13,7 +13,6 @@ module Assistant.Monad (
|
|||
newAssistantData,
|
||||
runAssistant,
|
||||
getAssistant,
|
||||
withAssistant,
|
||||
liftAnnex,
|
||||
(<~>),
|
||||
(<<~),
|
||||
|
@ -110,6 +109,3 @@ asIO2 a = do
|
|||
{- Runs an IO action on a selected field of the AssistantData. -}
|
||||
(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b
|
||||
io <<~ v = reader v >>= liftIO . io
|
||||
|
||||
withAssistant :: (AssistantData -> a) -> (a -> IO b) -> Assistant b
|
||||
withAssistant v io = io <<~ v
|
||||
|
|
|
@ -26,5 +26,5 @@ runNamedThread (NamedThread name a) = do
|
|||
let msg = unwords [name, "crashed:", show e]
|
||||
hPutStrLn stderr msg
|
||||
-- TODO click to restart
|
||||
void $ addAlert (daemonStatusHandle d) $
|
||||
warningAlert name msg
|
||||
flip runAssistant d $ void $
|
||||
addAlert $ warningAlert name msg
|
||||
|
|
|
@ -80,7 +80,7 @@ startSending :: DaemonStatusHandle -> PairingInProgress -> PairStage -> (PairSta
|
|||
startSending dstatus pip stage sender = void $ forkIO $ do
|
||||
tid <- myThreadId
|
||||
let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid }
|
||||
oldpip <- modifyDaemonStatus dstatus $
|
||||
oldpip <- modifyDaemonStatusOld dstatus $
|
||||
\s -> (s { pairingInProgress = Just pip' }, pairingInProgress s)
|
||||
maybe noop stopold oldpip
|
||||
sender stage
|
||||
|
@ -90,7 +90,7 @@ startSending dstatus pip stage sender = void $ forkIO $ do
|
|||
stopSending :: PairingInProgress -> DaemonStatusHandle -> IO ()
|
||||
stopSending pip dstatus = do
|
||||
maybe noop killThread $ inProgressThreadId pip
|
||||
modifyDaemonStatus_ dstatus $ \s -> s { pairingInProgress = Nothing }
|
||||
modifyDaemonStatusOld_ dstatus $ \s -> s { pairingInProgress = Nothing }
|
||||
|
||||
class ToSomeAddr a where
|
||||
toSomeAddr :: a -> SomeAddr
|
||||
|
|
|
@ -154,6 +154,6 @@ manualPull currentbranch remotes = do
|
|||
{- Start syncing a newly added remote, using a background thread. -}
|
||||
syncNewRemote :: Remote -> Assistant ()
|
||||
syncNewRemote remote = do
|
||||
liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
|
||||
updateSyncRemotes
|
||||
thread <- asIO2 reconnectRemotes
|
||||
void $ liftIO $ forkIO $ thread False [remote]
|
||||
|
|
|
@ -74,7 +74,7 @@ reloadConfigs changedconfigs = do
|
|||
{- Changes to the remote log, or the trust log, can affect the
|
||||
- syncRemotes list -}
|
||||
when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $
|
||||
liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
|
||||
updateSyncRemotes
|
||||
where
|
||||
(fs, as) = unzip $ filter (flip S.member changedfiles . fst)
|
||||
configFilesActions
|
||||
|
|
|
@ -177,7 +177,7 @@ remotesUnder dir = do
|
|||
let (waschanged, rs') = unzip pairs
|
||||
when (any id waschanged) $ do
|
||||
liftAnnex $ Annex.changeState $ \s -> s { Annex.remotes = rs' }
|
||||
liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
|
||||
updateSyncRemotes
|
||||
return $ map snd $ filter fst pairs
|
||||
where
|
||||
checkremote repotop r = case Remote.localpath r of
|
||||
|
|
|
@ -104,12 +104,12 @@ pairReqReceived :: Bool -> UrlRenderer -> PairMsg -> Assistant ()
|
|||
pairReqReceived True _ _ = noop -- ignore our own PairReq
|
||||
pairReqReceived False urlrenderer msg = do
|
||||
url <- liftIO $ renderUrl urlrenderer (FinishPairR msg) []
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ void $ addAlert dstatus $ pairRequestReceivedAlert repo
|
||||
close <- asIO removeAlert
|
||||
void $ addAlert $ pairRequestReceivedAlert repo
|
||||
AlertButton
|
||||
{ buttonUrl = url
|
||||
, buttonLabel = T.pack "Respond"
|
||||
, buttonAction = Just $ removeAlert dstatus
|
||||
, buttonAction = Just close
|
||||
}
|
||||
where
|
||||
repo = pairRepo msg
|
||||
|
|
|
@ -28,14 +28,12 @@ sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do
|
|||
debug ["sanity check complete"]
|
||||
where
|
||||
go = do
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
|
||||
{ sanityCheckRunning = True }
|
||||
modifyDaemonStatus_ $ \s -> s { sanityCheckRunning = True }
|
||||
|
||||
now <- liftIO $ getPOSIXTime -- before check started
|
||||
r <- either showerr return =<< tryIO <~> check
|
||||
|
||||
liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
|
||||
modifyDaemonStatus_ $ \s -> s
|
||||
{ sanityCheckRunning = False
|
||||
, lastSanityCheck = Just now
|
||||
}
|
||||
|
@ -84,8 +82,7 @@ check = do
|
|||
slop = fromIntegral tenMinutes
|
||||
insanity msg = do
|
||||
liftAnnex $ warning msg
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg
|
||||
void $ addAlert $ sanityCheckFixAlert msg
|
||||
addsymlink file s = do
|
||||
Watcher.runHandler Watcher.onAddSymlink file s
|
||||
insanity $ "found unstaged symlink: " ++ file
|
||||
|
|
|
@ -52,7 +52,5 @@ transferPollerThread = NamedThread "TransferPoller" $ do
|
|||
|
||||
newsize t info sz
|
||||
| bytesComplete info /= sz && isJust sz =
|
||||
withAssistant daemonStatusHandle $ \h ->
|
||||
alterTransferInfo h t $
|
||||
\i -> i { bytesComplete = sz }
|
||||
alterTransferInfo t $ \i -> i { bytesComplete = sz }
|
||||
| otherwise = noop
|
||||
|
|
|
@ -64,8 +64,7 @@ onAdd file = case parseTransferFile file of
|
|||
debug [ "transfer starting:", show t]
|
||||
r <- headMaybe . filter (sameuuid t)
|
||||
<$> liftAnnex Remote.remoteList
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ updateTransferInfo dstatus t info { transferRemote = r }
|
||||
updateTransferInfo t info { transferRemote = r }
|
||||
sameuuid t r = Remote.uuid r == transferUUID t
|
||||
|
||||
{- Called when a transfer information file is updated.
|
||||
|
@ -79,9 +78,8 @@ onModify file = do
|
|||
Just t -> go t =<< liftIO (readTransferInfoFile Nothing file)
|
||||
where
|
||||
go _ Nothing = noop
|
||||
go t (Just newinfo) = withAssistant daemonStatusHandle $ \h ->
|
||||
alterTransferInfo h t $
|
||||
\i -> i { bytesComplete = bytesComplete newinfo }
|
||||
go t (Just newinfo) = alterTransferInfo t $
|
||||
\i -> i { bytesComplete = bytesComplete newinfo }
|
||||
|
||||
{- This thread can only watch transfer sizes when the DirWatcher supports
|
||||
- tracking modificatons to files. -}
|
||||
|
@ -94,7 +92,7 @@ onDel file = case parseTransferFile file of
|
|||
Nothing -> noop
|
||||
Just t -> do
|
||||
debug [ "transfer finishing:", show t]
|
||||
minfo <- flip removeTransfer t <<~ daemonStatusHandle
|
||||
minfo <- removeTransfer t
|
||||
|
||||
finished <- asIO2 finishedTransfer
|
||||
void $ liftIO $ forkIO $ do
|
||||
|
|
|
@ -32,9 +32,8 @@ transfererThread = NamedThread "Transferr" $ do
|
|||
program <- liftIO readProgramFile
|
||||
transferqueue <- getAssistant transferQueue
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
slots <- getAssistant transferSlots
|
||||
starter <- asIO2 $ startTransfer program
|
||||
liftIO $ forever $ inTransferSlot dstatus slots $
|
||||
forever $ inTransferSlot $ liftIO $
|
||||
maybe (return Nothing) (uncurry starter)
|
||||
=<< getNextTransfer transferqueue dstatus notrunning
|
||||
where
|
||||
|
@ -48,12 +47,12 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
|
|||
(Just remote, Just file) -> ifM (liftAnnex $ shouldTransfer t info)
|
||||
( do
|
||||
debug [ "Transferring:" , show t ]
|
||||
notifyTransfer <<~ daemonStatusHandle
|
||||
notifyTransfer
|
||||
tp <- asIO2 transferprocess
|
||||
return $ Just (t, info, tp remote file)
|
||||
, do
|
||||
debug [ "Skipping unnecessary transfer:" , show t ]
|
||||
void $ flip removeTransfer t <<~ daemonStatusHandle
|
||||
void $ removeTransfer t
|
||||
return Nothing
|
||||
)
|
||||
_ -> return Nothing
|
||||
|
@ -77,10 +76,8 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
|
|||
- in the transfer.
|
||||
-}
|
||||
whenM (liftIO $ (==) ExitSuccess <$> waitForProcess pid) $ do
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ void $ addAlert dstatus $
|
||||
makeAlertFiller True $
|
||||
transferFileAlert direction True file
|
||||
void $ addAlert $ makeAlertFiller True $
|
||||
transferFileAlert direction True file
|
||||
recordCommit
|
||||
where
|
||||
params =
|
||||
|
|
|
@ -85,9 +85,7 @@ startupScan scanner = do
|
|||
inRepo $ Git.Command.run "add" [Param "--update"]
|
||||
showAction "started"
|
||||
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ modifyDaemonStatus_ dstatus $
|
||||
\s -> s { scanComplete = True }
|
||||
modifyDaemonStatus_ $ \s -> s { scanComplete = True }
|
||||
|
||||
return (True, r)
|
||||
|
||||
|
@ -218,8 +216,7 @@ onDelDir dir _ = do
|
|||
onErr :: Handler
|
||||
onErr msg _ = do
|
||||
liftAnnex $ warning msg
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg
|
||||
void $ addAlert $ warningAlert "watcher" msg
|
||||
noChange
|
||||
|
||||
{- Adds a symlink to the index, without ever accessing the actual symlink
|
||||
|
|
|
@ -123,7 +123,7 @@ enqueue schedule q dstatus t info
|
|||
atomically $ do
|
||||
void $ modifyTVar' (queuesize q) succ
|
||||
void $ modifyTVar' (queuelist q) modlist
|
||||
void $ notifyTransfer dstatus
|
||||
void $ notifyTransferOld dstatus
|
||||
|
||||
{- Adds a transfer to the queue. -}
|
||||
queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
||||
|
@ -182,7 +182,7 @@ dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) ->
|
|||
dequeueTransfers q dstatus c = do
|
||||
removed <- atomically $ dequeueTransfersSTM q c
|
||||
unless (null removed) $
|
||||
notifyTransfer dstatus
|
||||
notifyTransferOld dstatus
|
||||
return removed
|
||||
|
||||
dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)]
|
||||
|
|
|
@ -17,20 +17,22 @@ import qualified Control.Exception as E
|
|||
import Control.Concurrent
|
||||
import qualified Control.Concurrent.MSemN as MSemN
|
||||
|
||||
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, IO ()))
|
||||
|
||||
{- Waits until a transfer slot becomes available, then runs a
|
||||
- TransferGenerator, and then runs the transfer action in its own thread.
|
||||
-}
|
||||
inTransferSlot :: TransferSlotRunner
|
||||
inTransferSlot dstatus s gen = do
|
||||
MSemN.wait s 1
|
||||
runTransferThread dstatus s =<< gen
|
||||
inTransferSlot :: TransferGenerator -> Assistant ()
|
||||
inTransferSlot gen = do
|
||||
flip MSemN.wait 1 <<~ transferSlots
|
||||
runTransferThread =<< gen
|
||||
|
||||
{- Runs a TransferGenerator, and its transfer action,
|
||||
- without waiting for a slot to become available. -}
|
||||
inImmediateTransferSlot :: TransferSlotRunner
|
||||
inImmediateTransferSlot dstatus s gen = do
|
||||
MSemN.signal s (-1)
|
||||
runTransferThread dstatus s =<< gen
|
||||
inImmediateTransferSlot :: TransferGenerator -> Assistant ()
|
||||
inImmediateTransferSlot gen = do
|
||||
flip MSemN.signal (-1) <<~ transferSlots
|
||||
runTransferThread =<< gen
|
||||
|
||||
{- Runs a transfer action, in an already allocated transfer slot.
|
||||
- Once it finishes, frees the transfer slot.
|
||||
|
@ -42,24 +44,26 @@ inImmediateTransferSlot dstatus s gen = do
|
|||
- then pausing the thread until a ResumeTransfer exception is raised,
|
||||
- then rerunning the action.
|
||||
-}
|
||||
runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO ()
|
||||
runTransferThread _ s Nothing = MSemN.signal s 1
|
||||
runTransferThread dstatus s (Just (t, info, a)) = do
|
||||
tid <- forkIO go
|
||||
updateTransferInfo dstatus t $ info { transferTid = Just tid }
|
||||
runTransferThread :: Maybe (Transfer, TransferInfo, IO ()) -> Assistant ()
|
||||
runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots
|
||||
runTransferThread (Just (t, info, a)) = do
|
||||
d <- getAssistant id
|
||||
tid <- liftIO $ forkIO $ go d
|
||||
updateTransferInfo t $ info { transferTid = Just tid }
|
||||
where
|
||||
go = catchPauseResume a
|
||||
pause = catchPauseResume $ runEvery (Seconds 86400) noop
|
||||
go d = catchPauseResume d a
|
||||
pause d = catchPauseResume d $ runEvery (Seconds 86400) noop
|
||||
{- Note: This must use E.try, rather than E.catch.
|
||||
- When E.catch is used, and has called go in its exception
|
||||
- handler, Control.Concurrent.throwTo will block sometimes
|
||||
- when signaling. Using E.try avoids the problem. -}
|
||||
catchPauseResume a' = do
|
||||
catchPauseResume d a' = do
|
||||
r <- E.try a' :: IO (Either E.SomeException ())
|
||||
case r of
|
||||
Left e -> case E.fromException e of
|
||||
Just PauseTransfer -> pause
|
||||
Just ResumeTransfer -> go
|
||||
_ -> done
|
||||
_ -> done
|
||||
done = MSemN.signal s 1
|
||||
Just PauseTransfer -> pause d
|
||||
Just ResumeTransfer -> go d
|
||||
_ -> done d
|
||||
_ -> done d
|
||||
done d = flip runAssistant d $
|
||||
flip MSemN.signal 1 <<~ transferSlots
|
||||
|
|
|
@ -9,9 +9,6 @@
|
|||
|
||||
module Assistant.Types.TransferSlots where
|
||||
|
||||
import Assistant.Types.DaemonStatus
|
||||
import Logs.Transfer
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import qualified Control.Concurrent.MSemN as MSemN
|
||||
import Data.Typeable
|
||||
|
@ -25,9 +22,6 @@ data TransferException = PauseTransfer | ResumeTransfer
|
|||
|
||||
instance E.Exception TransferException
|
||||
|
||||
type TransferSlotRunner = DaemonStatusHandle -> TransferSlots -> TransferGenerator -> IO ()
|
||||
type TransferGenerator = IO (Maybe (Transfer, TransferInfo, IO ()))
|
||||
|
||||
{- Number of concurrent transfers allowed to be run from the assistant.
|
||||
-
|
||||
- Transfers launched by other means, including by remote assistants,
|
||||
|
|
|
@ -69,7 +69,6 @@ setRepoConfig uuid mremote oldc newc = do
|
|||
when (repoSyncable oldc /= repoSyncable newc) $
|
||||
changeSyncable mremote (repoSyncable newc)
|
||||
when (isJust mremote && repoName oldc /= repoName newc) $ do
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
runAnnex undefined $ do
|
||||
name <- fromRepo $ uniqueRemoteName (T.unpack $ repoName newc) 0
|
||||
inRepo $ Git.Command.run "remote"
|
||||
|
@ -78,7 +77,7 @@ setRepoConfig uuid mremote oldc newc = do
|
|||
, Param name
|
||||
]
|
||||
void $ Remote.remoteListRefresh
|
||||
updateSyncRemotes dstatus
|
||||
runAssistantY updateSyncRemotes
|
||||
|
||||
editRepositoryAForm :: RepoConfig -> AForm WebApp WebApp RepoConfig
|
||||
editRepositoryAForm def = RepoConfig
|
||||
|
|
|
@ -34,13 +34,14 @@ import qualified Data.Text as T
|
|||
{- Displays an alert suggesting to configure XMPP, with a button. -}
|
||||
xmppNeeded :: Handler ()
|
||||
xmppNeeded = whenM (isNothing <$> runAnnex Nothing getXMPPCreds) $ do
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
urlrender <- getUrlRender
|
||||
void $ liftIO $ addAlert dstatus $ xmppNeededAlert $ AlertButton
|
||||
{ buttonLabel = "Configure a Jabber account"
|
||||
, buttonUrl = urlrender XMPPR
|
||||
, buttonAction = Just $ removeAlert dstatus
|
||||
}
|
||||
void $ runAssistantY $ do
|
||||
close <- asIO removeAlert
|
||||
addAlert $ xmppNeededAlert $ AlertButton
|
||||
{ buttonLabel = "Configure a Jabber account"
|
||||
, buttonUrl = urlrender XMPPR
|
||||
, buttonAction = Just close
|
||||
}
|
||||
|
||||
getXMPPR :: Handler RepHtml
|
||||
#ifdef WITH_XMPP
|
||||
|
|
|
@ -73,9 +73,7 @@ getSideBarR nid = do
|
|||
|
||||
{- Called by the client to close an alert. -}
|
||||
getCloseAlert :: AlertId -> Handler ()
|
||||
getCloseAlert i = do
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
liftIO $ removeAlert dstatus i
|
||||
getCloseAlert = runAssistantY . removeAlert
|
||||
|
||||
{- When an alert with a button is clicked on, the button takes us here. -}
|
||||
getClickAlert :: AlertId -> Handler ()
|
||||
|
|
|
@ -38,7 +38,7 @@ changeSyncable (Just r) False = do
|
|||
changeSyncFlag r False
|
||||
d <- getAssistantY id
|
||||
let dstatus = daemonStatusHandle d
|
||||
runAssistantY $ liftAnnex $ updateSyncRemotes dstatus
|
||||
runAssistantY $ updateSyncRemotes
|
||||
{- Stop all transfers to or from this remote.
|
||||
- XXX Can't stop any ongoing scan, or git syncs. -}
|
||||
void $ liftIO $ dequeueTransfers (transferQueue d) dstatus tofrom
|
||||
|
@ -67,30 +67,27 @@ pauseTransfer = cancelTransfer True
|
|||
|
||||
cancelTransfer :: Bool -> Transfer -> Handler ()
|
||||
cancelTransfer pause t = do
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
tq <- getAssistantY transferQueue
|
||||
m <- getCurrentTransfers
|
||||
liftIO $ do
|
||||
unless pause $
|
||||
{- remove queued transfer -}
|
||||
void $ dequeueTransfers tq dstatus $
|
||||
equivilantTransfer t
|
||||
{- stop running transfer -}
|
||||
maybe noop (stop dstatus) (M.lookup t m)
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
unless pause $ liftIO $
|
||||
{- remove queued transfer -}
|
||||
void $ dequeueTransfers tq dstatus $
|
||||
equivilantTransfer t
|
||||
{- stop running transfer -}
|
||||
maybe noop stop (M.lookup t m)
|
||||
where
|
||||
stop dstatus info = do
|
||||
stop info = runAssistantY $ do
|
||||
{- When there's a thread associated with the
|
||||
- transfer, it's signaled first, to avoid it
|
||||
- displaying any alert about the transfer having
|
||||
- failed when the transfer process is killed. -}
|
||||
maybe noop signalthread $ transferTid info
|
||||
maybe noop killproc $ transferPid info
|
||||
liftIO $ maybe noop signalthread $ transferTid info
|
||||
liftIO $ maybe noop killproc $ transferPid info
|
||||
if pause
|
||||
then void $
|
||||
alterTransferInfo dstatus t $
|
||||
\i -> i { transferPaused = True }
|
||||
else void $
|
||||
removeTransfer dstatus t
|
||||
then void $ alterTransferInfo t $
|
||||
\i -> i { transferPaused = True }
|
||||
else void $ removeTransfer t
|
||||
signalthread tid
|
||||
| pause = throwTo tid PauseTransfer
|
||||
| otherwise = killThread tid
|
||||
|
@ -115,16 +112,12 @@ startTransfer t = do
|
|||
is <- liftIO $ map snd <$> getMatchingTransfers q dstatus (== t)
|
||||
maybe noop start $ headMaybe is
|
||||
resume tid = do
|
||||
dstatus <- getAssistantY daemonStatusHandle
|
||||
liftIO $ do
|
||||
alterTransferInfo dstatus t $
|
||||
\i -> i { transferPaused = False }
|
||||
throwTo tid ResumeTransfer
|
||||
runAssistantY $ alterTransferInfo t $
|
||||
\i -> i { transferPaused = False }
|
||||
liftIO $ throwTo tid ResumeTransfer
|
||||
start info = runAssistantY $ do
|
||||
program <- liftIO readProgramFile
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
slots <- getAssistant transferSlots
|
||||
inImmediateTransferSlot dstatus slots <~>
|
||||
inImmediateTransferSlot $
|
||||
Transferrer.startTransfer program t info
|
||||
|
||||
getCurrentTransfers :: Handler TransferMap
|
||||
|
|
Loading…
Reference in a new issue