99b7a0cfe9
Only clusters still need to be fixed to close this todo.
283 lines
9.2 KiB
Haskell
283 lines
9.2 KiB
Haskell
{- P2P protocol, Annex implementation
|
|
-
|
|
- Copyright 2016-2024 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
|
|
|
|
module P2P.Annex
|
|
( RunState(..)
|
|
, mkRunState
|
|
, P2PConnection(..)
|
|
, runFullProto
|
|
) where
|
|
|
|
import Annex.Common
|
|
import Annex.Content
|
|
import Annex.Transfer
|
|
import Annex.ChangedRefs
|
|
import P2P.Protocol
|
|
import P2P.IO
|
|
import Logs.Location
|
|
import Types.NumCopies
|
|
import Utility.Metered
|
|
import Utility.MonotonicClock
|
|
import Annex.Verify
|
|
|
|
import Control.Monad.Free
|
|
import Control.Concurrent.STM
|
|
import Data.Time.Clock.POSIX
|
|
import qualified Data.ByteString as S
|
|
|
|
-- Full interpreter for Proto, that can receive and send objects.
|
|
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
|
runFullProto runst conn = go
|
|
where
|
|
go :: RunProto Annex
|
|
go (Pure v) = return (Right v)
|
|
go (Free (Net n)) = runNet runst conn go n
|
|
go (Free (Local l)) = runLocal runst go l
|
|
|
|
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
|
|
runLocal runst runner a = case a of
|
|
TmpContentSize k next -> do
|
|
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
|
|
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
|
|
runner (next (Len size))
|
|
FileSize f next -> do
|
|
size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
|
|
runner (next (Len size))
|
|
ContentSize k next -> do
|
|
let getsize = liftIO . catchMaybeIO . getFileSize
|
|
size <- inAnnex' isJust Nothing getsize k
|
|
runner (next (Len <$> size))
|
|
ReadContent k af o offset sender next -> do
|
|
let proceed c = do
|
|
r <- tryNonAsync c
|
|
case r of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
-- If the content is not present, or the transfer doesn't
|
|
-- run for any other reason, the sender action still must
|
|
-- be run, so is given empty and Invalid data.
|
|
let fallback = runner (sender mempty (return Invalid))
|
|
v <- tryNonAsync $ prepSendAnnex k o
|
|
case v of
|
|
Right (Just (f, _sz, checkchanged)) -> proceed $ do
|
|
-- alwaysUpload to allow multiple uploads of the same key.
|
|
let runtransfer ti = transfer alwaysUpload k af Nothing $ \p ->
|
|
sinkfile f offset checkchanged sender p ti
|
|
checktransfer runtransfer fallback
|
|
Right Nothing -> proceed fallback
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
StoreContent k af o l getb validitycheck next -> do
|
|
-- This is the same as the retrievalSecurityPolicy of
|
|
-- Remote.P2P and Remote.Git.
|
|
let rsp = RetrievalAllKeysSecure
|
|
v <- tryNonAsync $ do
|
|
iv <- startVerifyKeyContentIncrementally DefaultVerify k
|
|
let runtransfer ti =
|
|
Right <$> transfer download' k af Nothing (\p ->
|
|
logStatusAfter k $ getViaTmp rsp DefaultVerify k af Nothing $ \tmp ->
|
|
storefile (fromRawFilePath tmp) o l getb iv validitycheck p ti)
|
|
let fallback = return $ Left $
|
|
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
|
|
checktransfer runtransfer fallback
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
StoreContentTo dest iv o l getb validitycheck next -> do
|
|
v <- tryNonAsync $ do
|
|
let runtransfer ti = Right
|
|
<$> storefile dest o l getb iv validitycheck nullMeterUpdate ti
|
|
let fallback = return $ Left $
|
|
ProtoFailureMessage "Transfer failed"
|
|
checktransfer runtransfer fallback
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
SetPresent k u next -> do
|
|
v <- tryNonAsync $ logChange k u InfoPresent
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right () -> runner next
|
|
CheckContentPresent k next -> do
|
|
v <- tryNonAsync $ inAnnex k
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right result -> runner (next result)
|
|
RemoveContent k mts next -> do
|
|
let cleanup = do
|
|
logStatus k InfoMissing
|
|
return True
|
|
let checkts = case mts of
|
|
Nothing -> return True
|
|
Just ts -> do
|
|
now <- liftIO currentMonotonicTimestamp
|
|
return (now < ts)
|
|
v <- tryNonAsync $
|
|
ifM (Annex.Content.inAnnex k)
|
|
( lockContentForRemoval k cleanup $ \contentlock ->
|
|
ifM checkts
|
|
( do
|
|
removeAnnex contentlock
|
|
cleanup
|
|
, return False
|
|
)
|
|
, return True
|
|
)
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right result -> runner (next result)
|
|
TryLockContent k protoaction next -> do
|
|
v <- tryNonAsync $ lockContentShared k (Just p2pDefaultLockContentRetentionDuration) $ \verifiedcopy ->
|
|
case verifiedcopy of
|
|
LockedCopy _ -> runner (protoaction True)
|
|
_ -> runner (protoaction False)
|
|
-- If locking fails, lockContentShared throws an exception.
|
|
-- Let the peer know it failed.
|
|
case v of
|
|
Left _ -> runner $ do
|
|
protoaction False
|
|
next
|
|
Right _ -> runner next
|
|
WaitRefChange next -> case runst of
|
|
Serving _ (Just h) _ -> do
|
|
v <- tryNonAsync $ liftIO $ waitChangedRefs h
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right changedrefs -> runner (next changedrefs)
|
|
_ -> return $ Left $
|
|
ProtoFailureMessage "change notification not available"
|
|
UpdateMeterTotalSize m sz next -> do
|
|
liftIO $ setMeterTotalSize m sz
|
|
runner next
|
|
RunValidityCheck checkaction next ->
|
|
runner . next =<< checkaction
|
|
GetLocalCurrentTime next ->
|
|
runner . next =<< liftIO getPOSIXTime
|
|
where
|
|
transfer mk k af sd ta = case runst of
|
|
-- Update transfer logs when serving.
|
|
-- Using noRetry because we're the sender.
|
|
Serving theiruuid _ _ ->
|
|
mk theiruuid k af sd noRetry ta noNotification
|
|
-- Transfer logs are updated higher in the stack when
|
|
-- a client.
|
|
Client _ -> ta nullMeterUpdate
|
|
|
|
resumefromoffset o incrementalverifier p h
|
|
| o /= 0 = do
|
|
p' <- case incrementalverifier of
|
|
Just iv -> do
|
|
go iv o
|
|
return p
|
|
_ -> return $ offsetMeterUpdate p (toBytesProcessed o)
|
|
-- Make sure the handle is seeked to the offset.
|
|
-- (Reading the file probably left it there
|
|
-- when that was done, but let's be sure.)
|
|
hSeek h AbsoluteSeek o
|
|
return p'
|
|
| otherwise = return p
|
|
where
|
|
go iv n
|
|
| n == 0 = return ()
|
|
| otherwise = do
|
|
let c = if n > fromIntegral defaultChunkSize
|
|
then defaultChunkSize
|
|
else fromIntegral n
|
|
b <- S.hGet h c
|
|
updateIncrementalVerifier iv b
|
|
unless (b == S.empty) $
|
|
go iv (n - fromIntegral (S.length b))
|
|
|
|
storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do
|
|
v <- runner getb
|
|
case v of
|
|
Right b -> do
|
|
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
|
|
p' <- resumefromoffset o incrementalverifier p h
|
|
let writechunk = case incrementalverifier of
|
|
Nothing -> \c -> S.hPut h c
|
|
Just iv -> \c -> do
|
|
S.hPut h c
|
|
updateIncrementalVerifier iv c
|
|
meteredWrite p' writechunk b
|
|
indicatetransferred ti
|
|
|
|
rightsize <- do
|
|
sz <- liftIO $ getFileSize (toRawFilePath dest)
|
|
return (toInteger sz == l + o)
|
|
|
|
runner validitycheck >>= \case
|
|
Right (Just Valid) -> case incrementalverifier of
|
|
Just iv
|
|
| rightsize -> liftIO (finalizeIncrementalVerifier iv) >>= \case
|
|
Nothing -> return (True, UnVerified)
|
|
Just True -> return (True, Verified)
|
|
Just False -> do
|
|
verificationOfContentFailed (toRawFilePath dest)
|
|
return (False, UnVerified)
|
|
| otherwise -> return (False, UnVerified)
|
|
Nothing -> return (rightsize, UnVerified)
|
|
Right (Just Invalid) | l == 0 ->
|
|
-- Special case, for when
|
|
-- content was not
|
|
-- available to send,
|
|
-- which is indicated by
|
|
-- sending 0 bytes and
|
|
-- Invalid.
|
|
return (False, UnVerified)
|
|
_ -> do
|
|
-- Invalid, or old protocol
|
|
-- version. Validity is not
|
|
-- known. Force content
|
|
-- verification.
|
|
return (rightsize, MustVerify)
|
|
Left e -> giveup $ describeProtoFailure e
|
|
|
|
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
|
|
where
|
|
setup = liftIO $ openBinaryFile f ReadMode
|
|
cleanup = liftIO . hClose
|
|
go h = do
|
|
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
|
when (o /= 0) $
|
|
liftIO $ hSeek h AbsoluteSeek o
|
|
b <- liftIO $ hGetContentsMetered h p'
|
|
|
|
let validitycheck = local $ runValidityCheck $
|
|
checkchanged >>= return . \case
|
|
False -> Invalid
|
|
True -> Valid
|
|
r <- runner (sender b validitycheck)
|
|
indicatetransferred ti
|
|
return r
|
|
|
|
-- This allows using actions like download and viaTmp
|
|
-- that may abort a transfer, and clean up the protocol after them.
|
|
--
|
|
-- Runs an action that may make a transfer, passing a transfer
|
|
-- indicator. The action should call indicatetransferred on it,
|
|
-- only after it's actually sent/received the all data.
|
|
--
|
|
-- If the action ends without having called indicatetransferred,
|
|
-- runs the fallback action, which can close the protoocol
|
|
-- connection or otherwise clean up after the transfer not having
|
|
-- occurred.
|
|
--
|
|
-- If the action throws an exception, the fallback is not run.
|
|
checktransfer ta fallback = do
|
|
ti <- liftIO $ newTVarIO False
|
|
r <- ta ti
|
|
ifM (liftIO $ atomically $ readTVar ti)
|
|
( return r
|
|
, fallback
|
|
)
|
|
|
|
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True
|