git-annex/P2P/Annex.hs

218 lines
6.8 KiB
Haskell
Raw Normal View History

{- P2P protocol, Annex implementation
-
- Copyright 2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
module P2P.Annex
( RunMode(..)
2016-12-06 19:40:31 +00:00
, P2PConnection(..)
, runFullProto
, unusedPeerRemoteName
, linkAddress
) where
import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import P2P.Address
import P2P.Auth
import Logs.Location
import Types.NumCopies
import Utility.Metered
import qualified Git.Command
import qualified Annex
import Annex.UUID
import Git.Types (RemoteName, remoteName, remotes)
import Config
import Control.Monad.Free
data RunMode
= Serving UUID (Maybe ChangedRefsHandle)
| Client
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunMode -> P2PConnection -> Proto a -> Annex (Either String a)
2016-12-06 19:40:31 +00:00
runFullProto runmode conn = go
where
go :: RunProto Annex
2016-12-10 15:12:18 +00:00
go (Pure v) = return (Right v)
2016-12-06 19:40:31 +00:00
go (Free (Net n)) = runNet conn go n
go (Free (Local l)) = runLocal runmode go l
runLocal :: RunMode -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
runLocal runmode runner a = case a of
TmpContentSize k next -> do
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
runner (next (Len size))
FileSize f next -> do
size <- liftIO $ catchDefaultIO 0 $ getFileSize f
runner (next (Len size))
ContentSize k next -> do
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
ReadContent k af o sender next -> do
v <- tryNonAsync $ prepSendAnnex k
case v of
-- The check can detect if the file
-- changed while it was transferred, but we don't
-- use it. Instead, the receiving peer must
-- AlwaysVerify the content it receives.
Right (Just (f, _check)) -> do
v' <- tryNonAsync $
transfer upload k af $
sinkfile f o sender
case v' of
Left e -> return (Left (show e))
Right (Left e) -> return (Left (show e))
Right (Right ok) -> runner (next ok)
-- content not available
Right Nothing -> runner (next False)
Left e -> return (Left (show e))
StoreContent k af o l getb next -> do
ok <- flip catchNonAsync (const $ return False) $
transfer download k af $ \p ->
getViaTmp AlwaysVerify k $ \tmp ->
unVerified $ storefile tmp o l getb p
runner (next ok)
StoreContentTo dest o l getb next -> do
ok <- flip catchNonAsync (const $ return False) $
storefile dest o l getb nullMeterUpdate
runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
Left e -> return (Left (show e))
Right () -> runner next
CheckContentPresent k next -> do
v <- tryNonAsync $ inAnnex k
case v of
Left e -> return (Left (show e))
Right result -> runner (next result)
RemoveContent k next -> do
v <- tryNonAsync $
2016-12-09 16:54:12 +00:00
ifM (Annex.Content.inAnnex k)
( lockContentForRemoval k $ \contentlock -> do
removeAnnex contentlock
logStatus k InfoMissing
return True
, return True
)
case v of
Left e -> return (Left (show e))
Right result -> runner (next result)
TryLockContent k protoaction next -> do
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
case verifiedcopy of
LockedCopy _ -> runner (protoaction True)
_ -> runner (protoaction False)
-- If locking fails, lockContentShared throws an exception.
-- Let the peer know it failed.
case v of
Left _ -> runner $ do
protoaction False
next
Right _ -> runner next
WaitRefChange next -> case runmode of
Serving _ (Just h) -> do
v <- tryNonAsync $ liftIO $ waitChangedRefs h
case v of
Left e -> return (Left (show e))
Right changedrefs -> runner (next changedrefs)
_ -> return $ Left "change notification not available"
AddLinkToPeer addr next -> do
v <- tryNonAsync $ do
-- Flood protection; don't let a huge number
-- of peer remotes be created.
ns <- usedPeerRemoteNames
if length ns > 100
then return $ Right False
else linkAddress addr [] =<< unusedPeerRemoteName
case v of
Right (Right r) -> runner (next r)
_ -> runner (next False)
where
transfer mk k af ta = case runmode of
-- Update transfer logs when serving.
Serving theiruuid _ ->
mk theiruuid k af noRetry ta noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
Client -> ta nullMeterUpdate
storefile dest (Offset o) (Len l) getb p = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
v <- runner getb
case v of
Right b -> liftIO $ do
withBinaryFile dest ReadWriteMode $ \h -> do
when (o /= 0) $
hSeek h AbsoluteSeek o
meteredWrite p' h b
sz <- getFileSize dest
return (toInteger sz == l + o)
Left e -> error e
sinkfile f (Offset o) sender p = bracket setup cleanup go
where
setup = liftIO $ openBinaryFile f ReadMode
cleanup = liftIO . hClose
go h = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
when (o /= 0) $
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
runner (sender b)
unusedPeerRemoteName :: Annex RemoteName
unusedPeerRemoteName = go (1 :: Integer) =<< usedPeerRemoteNames
where
go n names = do
let name = "peer" ++ show n
if name `elem` names
then go (n+1) names
else return name
usedPeerRemoteNames :: Annex [RemoteName]
usedPeerRemoteNames = filter ("peer" `isPrefixOf`)
. mapMaybe remoteName . remotes <$> Annex.gitRepo
linkAddress :: P2PAddressAuth -> [P2PAddressAuth] -> RemoteName -> Annex (Either String Bool)
linkAddress (P2PAddressAuth addr authtoken) linkbackto remotename = do
g <- Annex.gitRepo
cv <- liftIO $ tryNonAsync $ connectPeer g addr
case cv of
Left e -> return $ Left $ "Unable to connect with peer. Please check that the peer is connected to the network, and try again. (" ++ show e ++ ")"
Right conn -> do
u <- getUUID
v <- liftIO $ runNetProto conn $ do
authresp <- P2P.Protocol.auth u authtoken
lbok <- forM linkbackto $ P2P.Protocol.link
return (authresp, lbok)
case v of
Right (Just theiruuid, lbok) -> do
ok <- inRepo $ Git.Command.runBool
[ Param "remote", Param "add"
, Param remotename
, Param (formatP2PAddress addr)
]
when ok $ do
storeUUIDIn (remoteConfig remotename "uuid") theiruuid
storeP2PRemoteAuthToken addr authtoken
if not ok
then return $ Right False
else if or lbok || null linkbackto
then return $ Right True
else return $ Left "Linked with peer. However, the peer was unable to link back to us, so the link is one-way."
Right (Nothing, _) -> return $ Left "Unable to authenticate with peer. Please check the address and try again."
Left e -> return $ Left $ "Unable to authenticate with peer: " ++ e