Assistant monad, stage 3

All toplevel named threads are converted to the Assistant monad.

Some utility functions still need to be converted.
This commit is contained in:
Joey Hess 2012-10-29 14:07:12 -04:00
parent 1df7417403
commit 67ce7929a5
7 changed files with 174 additions and 182 deletions

View file

@ -178,13 +178,6 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
where where
go = do go = do
d <- getAssistant id d <- getAssistant id
st <- getAssistant threadState
dstatus <- getAssistant daemonStatusHandle
commitchan <- getAssistant commitChan
transferqueue <- getAssistant transferQueue
transferslots <- getAssistant transferSlots
scanremotes <- getAssistant scanRemoteMap
pushnotifier <- getAssistant pushNotifier
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
urlrenderer <- liftIO newUrlRenderer urlrenderer <- liftIO newUrlRenderer
#endif #endif
@ -193,7 +186,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
, assist $ webAppThread d urlrenderer False Nothing webappwaiter , assist $ webAppThread d urlrenderer False Nothing webappwaiter
#ifdef WITH_PAIRING #ifdef WITH_PAIRING
, assist $ pairListenerThread st dstatus scanremotes urlrenderer , assist $ pairListenerThread urlrenderer
#endif #endif
#endif #endif
, assist $ pushThread , assist $ pushThread
@ -201,7 +194,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ mergeThread , assist $ mergeThread
, assist $ transferWatcherThread , assist $ transferWatcherThread
, assist $ transferPollerThread , assist $ transferPollerThread
, assist $ transfererThread st dstatus transferqueue transferslots commitchan , assist $ transfererThread
, assist $ daemonStatusThread , assist $ daemonStatusThread
, assist $ sanityCheckerThread , assist $ sanityCheckerThread
, assist $ mountWatcherThread , assist $ mountWatcherThread

View file

@ -8,9 +8,6 @@
module Assistant.Pairing.MakeRemote where module Assistant.Pairing.MakeRemote where
import Assistant.Common import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Ssh import Assistant.Ssh
import Assistant.Pairing import Assistant.Pairing
import Assistant.Pairing.Network import Assistant.Pairing.Network
@ -31,13 +28,12 @@ setupAuthorizedKeys msg = do
{- When pairing is complete, this is used to set up the remote for the host {- When pairing is complete, this is used to set up the remote for the host
- we paired with. -} - we paired with. -}
finishedPairing :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PairMsg -> SshKeyPair -> IO () finishedPairing :: PairMsg -> SshKeyPair -> Assistant ()
finishedPairing st dstatus scanremotes msg keypair = do finishedPairing msg keypair = do
sshdata <- setupSshKeyPair keypair =<< pairMsgToSshData msg sshdata <- liftIO $ setupSshKeyPair keypair =<< pairMsgToSshData msg
{- Ensure that we know {- Ensure that we know the ssh host key for the host we paired with.
- the ssh host key for the host we paired with.
- If we don't, ssh over to get it. -} - If we don't, ssh over to get it. -}
unlessM (knownHost $ sshHostName sshdata) $ liftIO $ unlessM (knownHost $ sshHostName sshdata) $
void $ sshTranscript void $ sshTranscript
[ sshOpt "StrictHostKeyChecking" "no" [ sshOpt "StrictHostKeyChecking" "no"
, sshOpt "NumberOfPasswordPrompts" "0" , sshOpt "NumberOfPasswordPrompts" "0"
@ -46,7 +42,10 @@ finishedPairing st dstatus scanremotes msg keypair = do
, "git-annex-shell -c configlist " ++ T.unpack (sshDirectory sshdata) , "git-annex-shell -c configlist " ++ T.unpack (sshDirectory sshdata)
] ]
"" ""
void $ makeSshRemote st dstatus scanremotes False sshdata st <- getAssistant threadState
dstatus <- getAssistant daemonStatusHandle
scanremotes <- getAssistant scanRemoteMap
void $ liftIO $ makeSshRemote st dstatus scanremotes False sshdata
{- Mostly a straightforward conversion. Except: {- Mostly a straightforward conversion. Except:
- * Determine the best hostname to use to contact the host. - * Determine the best hostname to use to contact the host.

View file

@ -87,8 +87,8 @@ startSending dstatus pip stage sender = void $ forkIO $ do
where where
stopold = maybe noop killThread . inProgressThreadId stopold = maybe noop killThread . inProgressThreadId
stopSending :: DaemonStatusHandle -> PairingInProgress -> IO () stopSending :: PairingInProgress -> DaemonStatusHandle -> IO ()
stopSending dstatus pip = do stopSending pip dstatus = do
maybe noop killThread $ inProgressThreadId pip maybe noop killThread $ inProgressThreadId pip
modifyDaemonStatus_ dstatus $ \s -> s { pairingInProgress = Nothing } modifyDaemonStatus_ dstatus $ \s -> s { pairingInProgress = Nothing }

View file

@ -11,8 +11,6 @@ import Assistant.Common
import Assistant.Pairing import Assistant.Pairing
import Assistant.Pairing.Network import Assistant.Pairing.Network
import Assistant.Pairing.MakeRemote import Assistant.Pairing.MakeRemote
import Assistant.ThreadedMonad
import Assistant.ScanRemotes
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.WebApp import Assistant.WebApp
import Assistant.WebApp.Types import Assistant.WebApp.Types
@ -27,117 +25,116 @@ import Data.Char
thisThread :: ThreadName thisThread :: ThreadName
thisThread = "PairListener" thisThread = "PairListener"
pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread pairListenerThread :: UrlRenderer -> NamedThread
pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $ pairListenerThread urlrenderer = NamedThread "PairListener" $ do
runEvery (Seconds 1) $ void $ tryIO $ do listener <- asIO $ go [] []
sock <- getsock liftIO $ withSocketsDo $
go sock [] [] runEvery (Seconds 1) $ void $ tryIO $
where listener =<< getsock
thread = NamedThread thisThread where
{- Note this can crash if there's no network interface,
- or only one like lo that doesn't support multicast. -}
getsock = multicastReceiver (multicastAddress $ IPv4Addr undefined) pairingPort
{- Note this can crash if there's no network interface, go reqs cache sock = liftIO (getmsg sock []) >>= \msg -> case readish msg of
- or only one like lo that doesn't support multicast. -} Nothing -> go reqs cache sock
getsock = multicastReceiver (multicastAddress $ IPv4Addr undefined) pairingPort Just m -> do
sane <- checkSane msg
(pip, verified) <- verificationCheck m
=<< (pairingInProgress <$> daemonStatus)
let wrongstage = maybe False (\p -> pairMsgStage m <= inProgressPairStage p) pip
case (wrongstage, sane, pairMsgStage m) of
-- ignore our own messages, and
-- out of order messages
(True, _, _) -> go reqs cache sock
(_, False, _) -> go reqs cache sock
(_, _, PairReq) -> if m `elem` reqs
then go reqs (invalidateCache m cache) sock
else do
pairReqReceived verified urlrenderer m
go (m:take 10 reqs) (invalidateCache m cache) sock
(_, _, PairAck) -> do
cache' <- pairAckReceived verified pip m cache
go reqs cache' sock
(_, _, PairDone) -> do
pairDoneReceived verified pip m
go reqs cache sock
{- As well as verifying the message using the shared secret,
- check its UUID against the UUID we have stored. If
- they're the same, someone is sending bogus messages,
- which could be an attempt to brute force the shared secret. -}
verificationCheck _ Nothing = return (Nothing, False)
verificationCheck m (Just pip)
| not verified && sameuuid = do
liftAnnex $ warning
"detected possible pairing brute force attempt; disabled pairing"
stopSending pip <<~ daemonStatusHandle
return (Nothing, False)
|otherwise = return (Just pip, verified && sameuuid)
where
verified = verifiedPairMsg m pip
sameuuid = pairUUID (inProgressPairData pip) == pairUUID (pairMsgData m)
go sock reqs cache = getmsg sock [] >>= \msg -> case readish msg of {- Various sanity checks on the content of the message. -}
Nothing -> go sock reqs cache checkSane msg
Just m -> do {- Control characters could be used in a
sane <- checkSane msg - console poisoning attack. -}
(pip, verified) <- verificationCheck m | any isControl msg || any (`elem` "\r\n") msg = do
=<< (pairingInProgress <$> getDaemonStatus dstatus) liftAnnex $ warning
let wrongstage = maybe False (\p -> pairMsgStage m <= inProgressPairStage p) pip "illegal control characters in pairing message; ignoring"
case (wrongstage, sane, pairMsgStage m) of return False
-- ignore our own messages, and | otherwise = return True
-- out of order messages
(True, _, _) -> go sock reqs cache
(_, False, _) -> go sock reqs cache
(_, _, PairReq) -> if m `elem` reqs
then go sock reqs (invalidateCache m cache)
else do
pairReqReceived verified dstatus urlrenderer m
go sock (m:take 10 reqs) (invalidateCache m cache)
(_, _, PairAck) ->
pairAckReceived verified pip st dstatus scanremotes m cache
>>= go sock reqs
(_, _, PairDone) -> do
pairDoneReceived verified pip st dstatus scanremotes m
go sock reqs cache
{- As well as verifying the message using the shared secret, {- PairReqs invalidate the cache of recently finished pairings.
- check its UUID against the UUID we have stored. If - This is so that, if a new pairing is started with the
- they're the same, someone is sending bogus messages, - same secret used before, a bogus PairDone is not sent. -}
- which could be an attempt to brute force the shared invalidateCache msg = filter (not . verifiedPairMsg msg)
- secret.
-}
verificationCheck m (Just pip) = do
let verified = verifiedPairMsg m pip
let sameuuid = pairUUID (inProgressPairData pip) == pairUUID (pairMsgData m)
if not verified && sameuuid
then do
runThreadState st $
warning "detected possible pairing brute force attempt; disabled pairing"
stopSending dstatus pip
return (Nothing, False)
else return (Just pip, verified && sameuuid)
verificationCheck _ Nothing = return (Nothing, False)
{- Various sanity checks on the content of the message. -}
checkSane msg
{- Control characters could be used in a
- console poisoning attack. -}
| any isControl msg || any (`elem` "\r\n") msg = do
runThreadState st $
warning "illegal control characters in pairing message; ignoring"
return False
| otherwise = return True
{- PairReqs invalidate the cache of recently finished pairings. getmsg sock c = do
- This is so that, if a new pairing is started with the (msg, n, _) <- recvFrom sock chunksz
- same secret used before, a bogus PairDone is not sent. -} if n < chunksz
invalidateCache msg = filter (not . verifiedPairMsg msg) then return $ c ++ msg
else getmsg sock $ c ++ msg
getmsg sock c = do where
(msg, n, _) <- recvFrom sock chunksz chunksz = 1024
if n < chunksz
then return $ c ++ msg
else getmsg sock $ c ++ msg
where
chunksz = 1024
{- Show an alert when a PairReq is seen. -} {- Show an alert when a PairReq is seen. -}
pairReqReceived :: Bool -> DaemonStatusHandle -> UrlRenderer -> PairMsg -> IO () pairReqReceived :: Bool -> UrlRenderer -> PairMsg -> Assistant ()
pairReqReceived True _ _ _ = noop -- ignore our own PairReq pairReqReceived True _ _ = noop -- ignore our own PairReq
pairReqReceived False dstatus urlrenderer msg = do pairReqReceived False urlrenderer msg = do
url <- renderUrl urlrenderer (FinishPairR msg) [] url <- liftIO $ renderUrl urlrenderer (FinishPairR msg) []
void $ addAlert dstatus $ pairRequestReceivedAlert repo dstatus <- getAssistant daemonStatusHandle
liftIO $ void $ addAlert dstatus $ pairRequestReceivedAlert repo
AlertButton AlertButton
{ buttonUrl = url { buttonUrl = url
, buttonLabel = T.pack "Respond" , buttonLabel = T.pack "Respond"
, buttonAction = Just $ removeAlert dstatus , buttonAction = Just $ removeAlert dstatus
} }
where where
repo = pairRepo msg repo = pairRepo msg
{- When a verified PairAck is seen, a host is ready to pair with us, and has {- When a verified PairAck is seen, a host is ready to pair with us, and has
- already configured our ssh key. Stop sending PairReqs, finish the pairing, - already configured our ssh key. Stop sending PairReqs, finish the pairing,
- and send a single PairDone. - and send a single PairDone. -}
-} pairAckReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> [PairingInProgress] -> Assistant [PairingInProgress]
pairAckReceived :: Bool -> Maybe PairingInProgress -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PairMsg -> [PairingInProgress] -> IO [PairingInProgress] pairAckReceived True (Just pip) msg cache = do
pairAckReceived True (Just pip) st dstatus scanremotes msg cache = do stopSending pip <<~ daemonStatusHandle
stopSending dstatus pip liftIO $ setupAuthorizedKeys msg
setupAuthorizedKeys msg finishedPairing msg (inProgressSshKeyPair pip)
finishedPairing st dstatus scanremotes msg (inProgressSshKeyPair pip) dstatus <- getAssistant daemonStatusHandle
startSending dstatus pip PairDone $ multicastPairMsg liftIO $ startSending dstatus pip PairDone $ multicastPairMsg
(Just 1) (inProgressSecret pip) (inProgressPairData pip) (Just 1) (inProgressSecret pip) (inProgressPairData pip)
return $ pip : take 10 cache return $ pip : take 10 cache
{- A stale PairAck might also be seen, after we've finished pairing. {- A stale PairAck might also be seen, after we've finished pairing.
- Perhaps our PairDone was not received. To handle this, we keep - Perhaps our PairDone was not received. To handle this, we keep
- a cache of recently finished pairings, and re-send PairDone in - a cache of recently finished pairings, and re-send PairDone in
- response to stale PairAcks for them. -} - response to stale PairAcks for them. -}
pairAckReceived _ _ _ dstatus _ msg cache = do pairAckReceived _ _ msg cache = do
let pips = filter (verifiedPairMsg msg) cache let pips = filter (verifiedPairMsg msg) cache
dstatus <- getAssistant daemonStatusHandle
unless (null pips) $ unless (null pips) $
forM_ pips $ \pip -> liftIO $ forM_ pips $ \pip ->
startSending dstatus pip PairDone $ multicastPairMsg startSending dstatus pip PairDone $ multicastPairMsg
(Just 1) (inProgressSecret pip) (inProgressPairData pip) (Just 1) (inProgressSecret pip) (inProgressPairData pip)
return cache return cache
@ -151,9 +148,9 @@ pairAckReceived _ _ _ dstatus _ msg cache = do
- entering the secret. Would be better to start a fresh pair request in this - entering the secret. Would be better to start a fresh pair request in this
- situation. - situation.
-} -}
pairDoneReceived :: Bool -> Maybe PairingInProgress -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PairMsg -> IO () pairDoneReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> Assistant ()
pairDoneReceived False _ _ _ _ _ = noop -- not verified pairDoneReceived False _ _ = noop -- not verified
pairDoneReceived True Nothing _ _ _ _ = noop -- not in progress pairDoneReceived True Nothing _ = noop -- not in progress
pairDoneReceived True (Just pip) st dstatus scanremotes msg = do pairDoneReceived True (Just pip) msg = do
stopSending dstatus pip stopSending pip <<~ daemonStatusHandle
finishedPairing st dstatus scanremotes msg (inProgressSshKeyPair pip) finishedPairing msg (inProgressSshKeyPair pip)

View file

@ -8,7 +8,6 @@
module Assistant.Threads.Transferrer where module Assistant.Threads.Transferrer where
import Assistant.Common import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots import Assistant.TransferSlots
@ -23,75 +22,78 @@ import Locations.UserConfig
import System.Process (create_group) import System.Process (create_group)
thisThread :: ThreadName
thisThread = "Transferrer"
{- For now only one transfer is run at a time. -} {- For now only one transfer is run at a time. -}
maxTransfers :: Int maxTransfers :: Int
maxTransfers = 1 maxTransfers = 1
{- Dispatches transfers from the queue. -} {- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread transfererThread :: NamedThread
transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile transfererThread = NamedThread "Transferr" $ do
where program <- liftIO readProgramFile
thread = NamedThread thisThread transferqueue <- getAssistant transferQueue
go program = forever $ inTransferSlot dstatus slots $ dstatus <- getAssistant daemonStatusHandle
maybe (return Nothing) (uncurry $ startTransfer st dstatus commitchan program) slots <- getAssistant transferSlots
=<< getNextTransfer transferqueue dstatus notrunning starter <- asIO2 $ startTransfer program
{- Skip transfers that are already running. -} liftIO $ forever $ inTransferSlot dstatus slots $
notrunning = isNothing . startedTime maybe (return Nothing) (uncurry starter)
=<< getNextTransfer transferqueue dstatus notrunning
where
{- Skip transfers that are already running. -}
notrunning = isNothing . startedTime
{- By the time this is called, the daemonstatus's transfer map should {- By the time this is called, the daemonstatus's transfer map should
- already have been updated to include the transfer. -} - already have been updated to include the transfer. -}
startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath -> Transfer -> TransferInfo -> TransferGenerator startTransfer :: FilePath -> Transfer -> TransferInfo -> Assistant (Maybe (Transfer, TransferInfo, IO ()))
startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of startTransfer program t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info) (Just remote, Just file) -> ifM (liftAnnex $ shouldTransfer t info)
( do ( do
brokendebug thisThread [ "Transferring:" , show t ] debug [ "Transferring:" , show t ]
notifyTransfer dstatus notifyTransfer <<~ daemonStatusHandle
return $ Just (t, info, transferprocess remote file) tp <- asIO2 transferprocess
return $ Just (t, info, tp remote file)
, do , do
brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ] debug [ "Skipping unnecessary transfer:" , show t ]
void $ removeTransfer dstatus t void $ flip removeTransfer t <<~ daemonStatusHandle
return Nothing return Nothing
) )
_ -> return Nothing _ -> return Nothing
where where
direction = transferDirection t direction = transferDirection t
isdownload = direction == Download isdownload = direction == Download
transferprocess remote file = void $ do transferprocess remote file = void $ do
(_, _, _, pid) (_, _, _, pid)
<- createProcess (proc program $ toCommand params) <- liftIO $ createProcess (proc program $ toCommand params)
{ create_group = True } { create_group = True }
{- Alerts are only shown for successful transfers. {- Alerts are only shown for successful transfers.
- Transfers can temporarily fail for many reasons, - Transfers can temporarily fail for many reasons,
- so there's no point in bothering the user about - so there's no point in bothering the user about
- those. The assistant should recover. - those. The assistant should recover.
- -
- Also, after a successful transfer, the location - Also, after a successful transfer, the location
- log has changed. Indicate that a commit has been - log has changed. Indicate that a commit has been
- made, in order to queue a push of the git-annex - made, in order to queue a push of the git-annex
- branch out to remotes that did not participate - branch out to remotes that did not participate
- in the transfer. - in the transfer.
-} -}
whenM ((==) ExitSuccess <$> waitForProcess pid) $ do whenM (liftIO $ (==) ExitSuccess <$> waitForProcess pid) $ do
void $ addAlert dstatus $ dstatus <- getAssistant daemonStatusHandle
makeAlertFiller True $ liftIO $ void $ addAlert dstatus $
transferFileAlert direction True file makeAlertFiller True $
recordCommit commitchan transferFileAlert direction True file
where recordCommit <<~ commitChan
params = where
[ Param "transferkey" params =
, Param "--quiet" [ Param "transferkey"
, Param $ key2file $ transferKey t , Param "--quiet"
, Param $ if isdownload , Param $ key2file $ transferKey t
then "--from" , Param $ if isdownload
else "--to" then "--from"
, Param $ Remote.name remote else "--to"
, Param "--file" , Param $ Remote.name remote
, File file , Param "--file"
] , File file
]
{- Checks if the file to download is already present, or the remote {- Checks if the file to download is already present, or the remote
- being uploaded to isn't known to have the file. -} - being uploaded to isn't known to have the file. -}

View file

@ -78,6 +78,9 @@ getAssistantY f = f <$> (assistantData <$> getYesod)
getDaemonStatusY :: forall sub. GHandler sub WebApp DaemonStatus getDaemonStatusY :: forall sub. GHandler sub WebApp DaemonStatus
getDaemonStatusY = liftIO . getDaemonStatus =<< getAssistantY daemonStatusHandle getDaemonStatusY = liftIO . getDaemonStatus =<< getAssistantY daemonStatusHandle
runAssistantY :: forall sub a. (Assistant a) -> GHandler sub WebApp a
runAssistantY a = liftIO . runAssistant a =<< assistantData <$> getYesod
getWebAppState :: forall sub. GHandler sub WebApp WebAppState getWebAppState :: forall sub. GHandler sub WebApp WebAppState
getWebAppState = liftIO . atomically . readTMVar =<< webAppState <$> getYesod getWebAppState = liftIO . atomically . readTMVar =<< webAppState <$> getYesod

View file

@ -129,14 +129,12 @@ startTransfer t = do
(\i -> i { transferPaused = False }) (\i -> i { transferPaused = False })
dstatus dstatus
throwTo tid ResumeTransfer throwTo tid ResumeTransfer
start info = do start info = runAssistantY $ do
st <- getAssistantY threadState program <- liftIO readProgramFile
dstatus <- getAssistantY daemonStatusHandle dstatus <- getAssistant daemonStatusHandle
slots <- getAssistantY transferSlots slots <- getAssistant transferSlots
commitchan <- getAssistantY commitChan inImmediateTransferSlot dstatus slots <~>
liftIO $ inImmediateTransferSlot dstatus slots $ do Transferrer.startTransfer program t info
program <- readProgramFile
Transferrer.startTransfer st dstatus commitchan program t info
getCurrentTransfers :: Handler TransferMap getCurrentTransfers :: Handler TransferMap
getCurrentTransfers = currentTransfers <$> getDaemonStatusY getCurrentTransfers = currentTransfers <$> getDaemonStatusY