git-annex/P2P/Annex.hs
Joey Hess 9b0dde834e
convert getFileSize to RawFilePath
Lots of nice wins from this in avoiding unncessary work, and I think
nothing got slower.

This commit was sponsored by Boyd Stephen Smith Jr. on Patreon.
2020-11-05 11:32:57 -04:00

222 lines
7.2 KiB
Haskell

{- P2P protocol, Annex implementation
-
- Copyright 2016-2018 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
module P2P.Annex
( RunState(..)
, mkRunState
, P2PConnection(..)
, runFullProto
) where
import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import Logs.Location
import Types.NumCopies
import Utility.Metered
import Control.Monad.Free
import Control.Concurrent.STM
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
runFullProto runst conn = go
where
go :: RunProto Annex
go (Pure v) = return (Right v)
go (Free (Net n)) = runNet runst conn go n
go (Free (Local l)) = runLocal runst go l
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
runLocal runst runner a = case a of
TmpContentSize k next -> do
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
runner (next (Len size))
FileSize f next -> do
size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
runner (next (Len size))
ContentSize k next -> do
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
ReadContent k af o sender next -> do
let proceed c = do
r <- tryNonAsync c
case r of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
-- If the content is not present, or the transfer doesn't
-- run for any other reason, the sender action still must
-- be run, so is given empty and Invalid data.
let fallback = runner (sender mempty (return Invalid))
v <- tryNonAsync $ prepSendAnnex k
case v of
Right (Just (f, checkchanged)) -> proceed $ do
-- alwaysUpload to allow multiple uploads of the same key.
let runtransfer ti = transfer alwaysUpload k af $ \p ->
sinkfile f o checkchanged sender p ti
checktransfer runtransfer fallback
Right Nothing -> proceed fallback
Left e -> return $ Left $ ProtoFailureException e
StoreContent k af o l getb validitycheck next -> do
-- This is the same as the retrievalSecurityPolicy of
-- Remote.P2P and Remote.Git.
let rsp = RetrievalAllKeysSecure
v <- tryNonAsync $ do
let runtransfer ti =
Right <$> transfer download k af (\p ->
getViaTmp rsp DefaultVerify k $ \tmp ->
storefile (fromRawFilePath tmp) o l getb validitycheck p ti)
let fallback = return $ Left $
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
StoreContentTo dest o l getb validitycheck next -> do
v <- tryNonAsync $ do
let runtransfer ti = Right
<$> storefile dest o l getb validitycheck nullMeterUpdate ti
let fallback = return $ Left $
ProtoFailureMessage "transfer failed"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
Left e -> return $ Left $ ProtoFailureException e
Right () -> runner next
CheckContentPresent k next -> do
v <- tryNonAsync $ inAnnex k
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
RemoveContent k next -> do
let cleanup = do
logStatus k InfoMissing
return True
v <- tryNonAsync $
ifM (Annex.Content.inAnnex k)
( lockContentForRemoval k cleanup $ \contentlock -> do
removeAnnex contentlock
cleanup
, return True
)
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
TryLockContent k protoaction next -> do
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
case verifiedcopy of
LockedCopy _ -> runner (protoaction True)
_ -> runner (protoaction False)
-- If locking fails, lockContentShared throws an exception.
-- Let the peer know it failed.
case v of
Left _ -> runner $ do
protoaction False
next
Right _ -> runner next
WaitRefChange next -> case runst of
Serving _ (Just h) _ -> do
v <- tryNonAsync $ liftIO $ waitChangedRefs h
case v of
Left e -> return $ Left $ ProtoFailureException e
Right changedrefs -> runner (next changedrefs)
_ -> return $ Left $
ProtoFailureMessage "change notification not available"
UpdateMeterTotalSize m sz next -> do
liftIO $ setMeterTotalSize m sz
runner next
RunValidityCheck checkaction next -> runner . next =<< checkaction
where
transfer mk k af ta = case runst of
-- Update transfer logs when serving.
-- Using noRetry because we're the sender.
Serving theiruuid _ _ ->
mk theiruuid k af noRetry ta noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
Client _ -> ta nullMeterUpdate
storefile dest (Offset o) (Len l) getb validitycheck p ti = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
v <- runner getb
case v of
Right b -> do
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
when (o /= 0) $
hSeek h AbsoluteSeek o
meteredWrite p' h b
indicatetransferred ti
rightsize <- do
sz <- liftIO $ getFileSize (toRawFilePath dest)
return (toInteger sz == l + o)
runner validitycheck >>= \case
Right (Just Valid) ->
return (rightsize, UnVerified)
_ -> do
-- Invalid, or old protocol
-- version. Validity is not
-- known. Force content
-- verification.
return (rightsize, MustVerify)
Left e -> error $ describeProtoFailure e
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
where
setup = liftIO $ openBinaryFile f ReadMode
cleanup = liftIO . hClose
go h = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
when (o /= 0) $
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
let validitycheck = local $ runValidityCheck $
checkchanged >>= return . \case
False -> Invalid
True -> Valid
r <- runner (sender b validitycheck)
indicatetransferred ti
return r
-- This allows using actions like download and viaTmp
-- that may abort a transfer, and clean up the protocol after them.
--
-- Runs an action that may make a transfer, passing a transfer
-- indicator. The action should call indicatetransferred on it,
-- only after it's actually sent/received the all data.
--
-- If the action ends without having called indicatetransferred,
-- runs the fallback action, which can close the protoocol
-- connection or otherwise clean up after the transfer not having
-- occurred.
--
-- If the action throws an exception, the fallback is not run.
checktransfer ta fallback = do
ti <- liftIO $ newTVarIO False
r <- ta ti
ifM (liftIO $ atomically $ readTVar ti)
( return r
, fallback
)
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True