git-annex/P2P/Annex.hs
Joey Hess 8b5fc94d50
add optional object file location to storeKey
This will be used by the next commit to simplify the proxy.
2024-07-01 10:42:27 -04:00

269 lines
8.8 KiB
Haskell

{- P2P protocol, Annex implementation
-
- Copyright 2016-2022 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
module P2P.Annex
( RunState(..)
, mkRunState
, P2PConnection(..)
, runFullProto
) where
import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import Logs.Location
import Types.NumCopies
import Utility.Metered
import Annex.Verify
import Control.Monad.Free
import Control.Concurrent.STM
import qualified Data.ByteString as S
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
runFullProto runst conn = go
where
go :: RunProto Annex
go (Pure v) = return (Right v)
go (Free (Net n)) = runNet runst conn go n
go (Free (Local l)) = runLocal runst go l
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
runLocal runst runner a = case a of
TmpContentSize k next -> do
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
runner (next (Len size))
FileSize f next -> do
size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
runner (next (Len size))
ContentSize k next -> do
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
ReadContent k af o offset sender next -> do
let proceed c = do
r <- tryNonAsync c
case r of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
-- If the content is not present, or the transfer doesn't
-- run for any other reason, the sender action still must
-- be run, so is given empty and Invalid data.
let fallback = runner (sender mempty (return Invalid))
v <- tryNonAsync $ prepSendAnnex k o
case v of
Right (Just (f, _sz, checkchanged)) -> proceed $ do
-- alwaysUpload to allow multiple uploads of the same key.
let runtransfer ti = transfer alwaysUpload k af Nothing $ \p ->
sinkfile f offset checkchanged sender p ti
checktransfer runtransfer fallback
Right Nothing -> proceed fallback
Left e -> return $ Left $ ProtoFailureException e
StoreContent k af o l getb validitycheck next -> do
-- This is the same as the retrievalSecurityPolicy of
-- Remote.P2P and Remote.Git.
let rsp = RetrievalAllKeysSecure
v <- tryNonAsync $ do
iv <- startVerifyKeyContentIncrementally DefaultVerify k
let runtransfer ti =
Right <$> transfer download' k af Nothing (\p ->
logStatusAfter k $ getViaTmp rsp DefaultVerify k af Nothing $ \tmp ->
storefile (fromRawFilePath tmp) o l getb iv validitycheck p ti)
let fallback = return $ Left $
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
StoreContentTo dest iv o l getb validitycheck next -> do
v <- tryNonAsync $ do
let runtransfer ti = Right
<$> storefile dest o l getb iv validitycheck nullMeterUpdate ti
let fallback = return $ Left $
ProtoFailureMessage "Transfer failed"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
Left e -> return $ Left $ ProtoFailureException e
Right () -> runner next
CheckContentPresent k next -> do
v <- tryNonAsync $ inAnnex k
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
RemoveContent k next -> do
let cleanup = do
logStatus k InfoMissing
return True
v <- tryNonAsync $
ifM (Annex.Content.inAnnex k)
( lockContentForRemoval k cleanup $ \contentlock -> do
removeAnnex contentlock
cleanup
, return True
)
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
TryLockContent k protoaction next -> do
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
case verifiedcopy of
LockedCopy _ -> runner (protoaction True)
_ -> runner (protoaction False)
-- If locking fails, lockContentShared throws an exception.
-- Let the peer know it failed.
case v of
Left _ -> runner $ do
protoaction False
next
Right _ -> runner next
WaitRefChange next -> case runst of
Serving _ (Just h) _ -> do
v <- tryNonAsync $ liftIO $ waitChangedRefs h
case v of
Left e -> return $ Left $ ProtoFailureException e
Right changedrefs -> runner (next changedrefs)
_ -> return $ Left $
ProtoFailureMessage "change notification not available"
UpdateMeterTotalSize m sz next -> do
liftIO $ setMeterTotalSize m sz
runner next
RunValidityCheck checkaction next -> runner . next =<< checkaction
where
transfer mk k af sd ta = case runst of
-- Update transfer logs when serving.
-- Using noRetry because we're the sender.
Serving theiruuid _ _ ->
mk theiruuid k af sd noRetry ta noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
Client _ -> ta nullMeterUpdate
resumefromoffset o incrementalverifier p h
| o /= 0 = do
p' <- case incrementalverifier of
Just iv -> do
go iv o
return p
_ -> return $ offsetMeterUpdate p (toBytesProcessed o)
-- Make sure the handle is seeked to the offset.
-- (Reading the file probably left it there
-- when that was done, but let's be sure.)
hSeek h AbsoluteSeek o
return p'
| otherwise = return p
where
go iv n
| n == 0 = return ()
| otherwise = do
let c = if n > fromIntegral defaultChunkSize
then defaultChunkSize
else fromIntegral n
b <- S.hGet h c
updateIncrementalVerifier iv b
unless (b == S.empty) $
go iv (n - fromIntegral (S.length b))
storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do
v <- runner getb
case v of
Right b -> do
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
p' <- resumefromoffset o incrementalverifier p h
let writechunk = case incrementalverifier of
Nothing -> \c -> S.hPut h c
Just iv -> \c -> do
S.hPut h c
updateIncrementalVerifier iv c
meteredWrite p' writechunk b
indicatetransferred ti
rightsize <- do
sz <- liftIO $ getFileSize (toRawFilePath dest)
return (toInteger sz == l + o)
runner validitycheck >>= \case
Right (Just Valid) -> case incrementalverifier of
Just iv
| rightsize -> liftIO (finalizeIncrementalVerifier iv) >>= \case
Nothing -> return (True, UnVerified)
Just True -> return (True, Verified)
Just False -> do
verificationOfContentFailed (toRawFilePath dest)
return (False, UnVerified)
| otherwise -> return (False, UnVerified)
Nothing -> return (rightsize, UnVerified)
Right (Just Invalid) | l == 0 ->
-- Special case, for when
-- content was not
-- available to send,
-- which is indicated by
-- sending 0 bytes and
-- Invalid.
return (False, UnVerified)
_ -> do
-- Invalid, or old protocol
-- version. Validity is not
-- known. Force content
-- verification.
return (rightsize, MustVerify)
Left e -> giveup $ describeProtoFailure e
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
where
setup = liftIO $ openBinaryFile f ReadMode
cleanup = liftIO . hClose
go h = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
when (o /= 0) $
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
let validitycheck = local $ runValidityCheck $
checkchanged >>= return . \case
False -> Invalid
True -> Valid
r <- runner (sender b validitycheck)
indicatetransferred ti
return r
-- This allows using actions like download and viaTmp
-- that may abort a transfer, and clean up the protocol after them.
--
-- Runs an action that may make a transfer, passing a transfer
-- indicator. The action should call indicatetransferred on it,
-- only after it's actually sent/received the all data.
--
-- If the action ends without having called indicatetransferred,
-- runs the fallback action, which can close the protoocol
-- connection or otherwise clean up after the transfer not having
-- occurred.
--
-- If the action throws an exception, the fallback is not run.
checktransfer ta fallback = do
ti <- liftIO $ newTVarIO False
r <- ta ti
ifM (liftIO $ atomically $ readTVar ti)
( return r
, fallback
)
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True