d2b27ca136
This allows lockContentShared to lock content for eg, 10 minutes and if the process then gets terminated before it can unlock, the content will remain locked for that amount of time. The Windows implementation is not yet tested. In P2P.Annex, a duration of 10 minutes is used. This way, when p2pstdio or remotedaemon is serving the P2P protocol, and is asked to LOCKCONTENT, and that process gets killed, the content will not be subject to deletion. This is not a perfect solution to doc/todo/P2P_locking_connection_drop_safety.mdwn yet, but it gets most of the way there, without needing any P2P protocol changes. This is only done in v10 and higher repositories (or on Windows). It might be possible to backport it to v8 or earlier, but it would complicate locking even further, and without a separate lock file, might be hard. I think that by the time this fix reaches a given user, they will probably have been running git-annex 10.x long enough that their v8 repositories will have upgraded to v10 after the 1 year wait. And it's not as if git-annex hasn't already been subject to this problem (though I have not heard of any data loss caused by it) for 6 years already, so waiting another fraction of a year on top of however long it takes this fix to reach users is unlikely to be a problem.
270 lines
8.9 KiB
Haskell
270 lines
8.9 KiB
Haskell
{- P2P protocol, Annex implementation
|
|
-
|
|
- Copyright 2016-2023 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
|
|
|
|
module P2P.Annex
|
|
( RunState(..)
|
|
, mkRunState
|
|
, P2PConnection(..)
|
|
, runFullProto
|
|
) where
|
|
|
|
import Annex.Common
|
|
import Annex.Content
|
|
import Annex.Transfer
|
|
import Annex.ChangedRefs
|
|
import P2P.Protocol
|
|
import P2P.IO
|
|
import Logs.Location
|
|
import Types.NumCopies
|
|
import Utility.Metered
|
|
import Utility.HumanTime
|
|
import Annex.Verify
|
|
|
|
import Control.Monad.Free
|
|
import Control.Concurrent.STM
|
|
import qualified Data.ByteString as S
|
|
|
|
-- Full interpreter for Proto, that can receive and send objects.
|
|
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
|
runFullProto runst conn = go
|
|
where
|
|
go :: RunProto Annex
|
|
go (Pure v) = return (Right v)
|
|
go (Free (Net n)) = runNet runst conn go n
|
|
go (Free (Local l)) = runLocal runst go l
|
|
|
|
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
|
|
runLocal runst runner a = case a of
|
|
TmpContentSize k next -> do
|
|
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
|
|
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
|
|
runner (next (Len size))
|
|
FileSize f next -> do
|
|
size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
|
|
runner (next (Len size))
|
|
ContentSize k next -> do
|
|
let getsize = liftIO . catchMaybeIO . getFileSize
|
|
size <- inAnnex' isJust Nothing getsize k
|
|
runner (next (Len <$> size))
|
|
ReadContent k af o offset sender next -> do
|
|
let proceed c = do
|
|
r <- tryNonAsync c
|
|
case r of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
-- If the content is not present, or the transfer doesn't
|
|
-- run for any other reason, the sender action still must
|
|
-- be run, so is given empty and Invalid data.
|
|
let fallback = runner (sender mempty (return Invalid))
|
|
v <- tryNonAsync $ prepSendAnnex k o
|
|
case v of
|
|
Right (Just (f, _sz, checkchanged)) -> proceed $ do
|
|
-- alwaysUpload to allow multiple uploads of the same key.
|
|
let runtransfer ti = transfer alwaysUpload k af Nothing $ \p ->
|
|
sinkfile f offset checkchanged sender p ti
|
|
checktransfer runtransfer fallback
|
|
Right Nothing -> proceed fallback
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
StoreContent k af o l getb validitycheck next -> do
|
|
-- This is the same as the retrievalSecurityPolicy of
|
|
-- Remote.P2P and Remote.Git.
|
|
let rsp = RetrievalAllKeysSecure
|
|
v <- tryNonAsync $ do
|
|
iv <- startVerifyKeyContentIncrementally DefaultVerify k
|
|
let runtransfer ti =
|
|
Right <$> transfer download' k af Nothing (\p ->
|
|
logStatusAfter k $ getViaTmp rsp DefaultVerify k af Nothing $ \tmp ->
|
|
storefile (fromRawFilePath tmp) o l getb iv validitycheck p ti)
|
|
let fallback = return $ Left $
|
|
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
|
|
checktransfer runtransfer fallback
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
StoreContentTo dest iv o l getb validitycheck next -> do
|
|
v <- tryNonAsync $ do
|
|
let runtransfer ti = Right
|
|
<$> storefile dest o l getb iv validitycheck nullMeterUpdate ti
|
|
let fallback = return $ Left $
|
|
ProtoFailureMessage "Transfer failed"
|
|
checktransfer runtransfer fallback
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right (Left e) -> return $ Left e
|
|
Right (Right ok) -> runner (next ok)
|
|
SetPresent k u next -> do
|
|
v <- tryNonAsync $ logChange k u InfoPresent
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right () -> runner next
|
|
CheckContentPresent k next -> do
|
|
v <- tryNonAsync $ inAnnex k
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right result -> runner (next result)
|
|
RemoveContent k next -> do
|
|
let cleanup = do
|
|
logStatus k InfoMissing
|
|
return True
|
|
v <- tryNonAsync $
|
|
ifM (Annex.Content.inAnnex k)
|
|
( lockContentForRemoval k cleanup $ \contentlock -> do
|
|
removeAnnex contentlock
|
|
cleanup
|
|
, return True
|
|
)
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right result -> runner (next result)
|
|
TryLockContent k protoaction next -> do
|
|
v <- tryNonAsync $ lockContentShared k (Just (Duration (60*10))) $ \verifiedcopy ->
|
|
case verifiedcopy of
|
|
LockedCopy _ -> runner (protoaction True)
|
|
_ -> runner (protoaction False)
|
|
-- If locking fails, lockContentShared throws an exception.
|
|
-- Let the peer know it failed.
|
|
case v of
|
|
Left _ -> runner $ do
|
|
protoaction False
|
|
next
|
|
Right _ -> runner next
|
|
WaitRefChange next -> case runst of
|
|
Serving _ (Just h) _ -> do
|
|
v <- tryNonAsync $ liftIO $ waitChangedRefs h
|
|
case v of
|
|
Left e -> return $ Left $ ProtoFailureException e
|
|
Right changedrefs -> runner (next changedrefs)
|
|
_ -> return $ Left $
|
|
ProtoFailureMessage "change notification not available"
|
|
UpdateMeterTotalSize m sz next -> do
|
|
liftIO $ setMeterTotalSize m sz
|
|
runner next
|
|
RunValidityCheck checkaction next -> runner . next =<< checkaction
|
|
where
|
|
transfer mk k af sd ta = case runst of
|
|
-- Update transfer logs when serving.
|
|
-- Using noRetry because we're the sender.
|
|
Serving theiruuid _ _ ->
|
|
mk theiruuid k af sd noRetry ta noNotification
|
|
-- Transfer logs are updated higher in the stack when
|
|
-- a client.
|
|
Client _ -> ta nullMeterUpdate
|
|
|
|
resumefromoffset o incrementalverifier p h
|
|
| o /= 0 = do
|
|
p' <- case incrementalverifier of
|
|
Just iv -> do
|
|
go iv o
|
|
return p
|
|
_ -> return $ offsetMeterUpdate p (toBytesProcessed o)
|
|
-- Make sure the handle is seeked to the offset.
|
|
-- (Reading the file probably left it there
|
|
-- when that was done, but let's be sure.)
|
|
hSeek h AbsoluteSeek o
|
|
return p'
|
|
| otherwise = return p
|
|
where
|
|
go iv n
|
|
| n == 0 = return ()
|
|
| otherwise = do
|
|
let c = if n > fromIntegral defaultChunkSize
|
|
then defaultChunkSize
|
|
else fromIntegral n
|
|
b <- S.hGet h c
|
|
updateIncrementalVerifier iv b
|
|
unless (b == S.empty) $
|
|
go iv (n - fromIntegral (S.length b))
|
|
|
|
storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do
|
|
v <- runner getb
|
|
case v of
|
|
Right b -> do
|
|
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
|
|
p' <- resumefromoffset o incrementalverifier p h
|
|
let writechunk = case incrementalverifier of
|
|
Nothing -> \c -> S.hPut h c
|
|
Just iv -> \c -> do
|
|
S.hPut h c
|
|
updateIncrementalVerifier iv c
|
|
meteredWrite p' writechunk b
|
|
indicatetransferred ti
|
|
|
|
rightsize <- do
|
|
sz <- liftIO $ getFileSize (toRawFilePath dest)
|
|
return (toInteger sz == l + o)
|
|
|
|
runner validitycheck >>= \case
|
|
Right (Just Valid) -> case incrementalverifier of
|
|
Just iv
|
|
| rightsize -> liftIO (finalizeIncrementalVerifier iv) >>= \case
|
|
Nothing -> return (True, UnVerified)
|
|
Just True -> return (True, Verified)
|
|
Just False -> do
|
|
verificationOfContentFailed (toRawFilePath dest)
|
|
return (False, UnVerified)
|
|
| otherwise -> return (False, UnVerified)
|
|
Nothing -> return (rightsize, UnVerified)
|
|
Right (Just Invalid) | l == 0 ->
|
|
-- Special case, for when
|
|
-- content was not
|
|
-- available to send,
|
|
-- which is indicated by
|
|
-- sending 0 bytes and
|
|
-- Invalid.
|
|
return (False, UnVerified)
|
|
_ -> do
|
|
-- Invalid, or old protocol
|
|
-- version. Validity is not
|
|
-- known. Force content
|
|
-- verification.
|
|
return (rightsize, MustVerify)
|
|
Left e -> giveup $ describeProtoFailure e
|
|
|
|
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
|
|
where
|
|
setup = liftIO $ openBinaryFile f ReadMode
|
|
cleanup = liftIO . hClose
|
|
go h = do
|
|
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
|
when (o /= 0) $
|
|
liftIO $ hSeek h AbsoluteSeek o
|
|
b <- liftIO $ hGetContentsMetered h p'
|
|
|
|
let validitycheck = local $ runValidityCheck $
|
|
checkchanged >>= return . \case
|
|
False -> Invalid
|
|
True -> Valid
|
|
r <- runner (sender b validitycheck)
|
|
indicatetransferred ti
|
|
return r
|
|
|
|
-- This allows using actions like download and viaTmp
|
|
-- that may abort a transfer, and clean up the protocol after them.
|
|
--
|
|
-- Runs an action that may make a transfer, passing a transfer
|
|
-- indicator. The action should call indicatetransferred on it,
|
|
-- only after it's actually sent/received the all data.
|
|
--
|
|
-- If the action ends without having called indicatetransferred,
|
|
-- runs the fallback action, which can close the protoocol
|
|
-- connection or otherwise clean up after the transfer not having
|
|
-- occurred.
|
|
--
|
|
-- If the action throws an exception, the fallback is not run.
|
|
checktransfer ta fallback = do
|
|
ti <- liftIO $ newTVarIO False
|
|
r <- ta ti
|
|
ifM (liftIO $ atomically $ readTVar ti)
|
|
( return r
|
|
, fallback
|
|
)
|
|
|
|
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True
|