git-annex/P2P/Annex.hs
Joey Hess 0540e987b3
improve p2p protocol handling of requested object not available
Avoid spurious "verification of content failed" message when downloading
content from a ssh or tor remote fails due to the remote no longer having a
copy of the content.

The P2P protocol already handled this case by sending DATA 0, followed by
VALID. But VALID was not really right, because the data is not the
requested data. So, send DATA 0, followed by INVALID. Old versions of
git-annex handle INVALID the same as VALID in this case. Now new versions
avoid displaying an incorrect message.

It would be better for the P2P protocol to have a different way to indicate
this, like perhaps sending INVALID without DATA. But that would be a
breaking change and need a new protocol verison. Since INVALID already is
part of the protocol and already needs to be handled, using it for this
special case too seems ok, and avoids the complication of another protocol
version.

This commit was sponsored by Jochen Bartl on Patreon.
2020-12-01 16:05:55 -04:00

230 lines
7.4 KiB
Haskell

{- P2P protocol, Annex implementation
-
- Copyright 2016-2018 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
module P2P.Annex
( RunState(..)
, mkRunState
, P2PConnection(..)
, runFullProto
) where
import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import Logs.Location
import Types.NumCopies
import Utility.Metered
import Control.Monad.Free
import Control.Concurrent.STM
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
runFullProto runst conn = go
where
go :: RunProto Annex
go (Pure v) = return (Right v)
go (Free (Net n)) = runNet runst conn go n
go (Free (Local l)) = runLocal runst go l
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
runLocal runst runner a = case a of
TmpContentSize k next -> do
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
runner (next (Len size))
FileSize f next -> do
size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
runner (next (Len size))
ContentSize k next -> do
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
ReadContent k af o sender next -> do
let proceed c = do
r <- tryNonAsync c
case r of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
-- If the content is not present, or the transfer doesn't
-- run for any other reason, the sender action still must
-- be run, so is given empty and Invalid data.
let fallback = runner (sender mempty (return Invalid))
v <- tryNonAsync $ prepSendAnnex k
case v of
Right (Just (f, checkchanged)) -> proceed $ do
-- alwaysUpload to allow multiple uploads of the same key.
let runtransfer ti = transfer alwaysUpload k af $ \p ->
sinkfile f o checkchanged sender p ti
checktransfer runtransfer fallback
Right Nothing -> proceed fallback
Left e -> return $ Left $ ProtoFailureException e
StoreContent k af o l getb validitycheck next -> do
-- This is the same as the retrievalSecurityPolicy of
-- Remote.P2P and Remote.Git.
let rsp = RetrievalAllKeysSecure
v <- tryNonAsync $ do
let runtransfer ti =
Right <$> transfer download k af (\p ->
getViaTmp rsp DefaultVerify k af $ \tmp ->
storefile (fromRawFilePath tmp) o l getb validitycheck p ti)
let fallback = return $ Left $
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
StoreContentTo dest o l getb validitycheck next -> do
v <- tryNonAsync $ do
let runtransfer ti = Right
<$> storefile dest o l getb validitycheck nullMeterUpdate ti
let fallback = return $ Left $
ProtoFailureMessage "transfer failed"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
Left e -> return $ Left $ ProtoFailureException e
Right () -> runner next
CheckContentPresent k next -> do
v <- tryNonAsync $ inAnnex k
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
RemoveContent k next -> do
let cleanup = do
logStatus k InfoMissing
return True
v <- tryNonAsync $
ifM (Annex.Content.inAnnex k)
( lockContentForRemoval k cleanup $ \contentlock -> do
removeAnnex contentlock
cleanup
, return True
)
case v of
Left e -> return $ Left $ ProtoFailureException e
Right result -> runner (next result)
TryLockContent k protoaction next -> do
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
case verifiedcopy of
LockedCopy _ -> runner (protoaction True)
_ -> runner (protoaction False)
-- If locking fails, lockContentShared throws an exception.
-- Let the peer know it failed.
case v of
Left _ -> runner $ do
protoaction False
next
Right _ -> runner next
WaitRefChange next -> case runst of
Serving _ (Just h) _ -> do
v <- tryNonAsync $ liftIO $ waitChangedRefs h
case v of
Left e -> return $ Left $ ProtoFailureException e
Right changedrefs -> runner (next changedrefs)
_ -> return $ Left $
ProtoFailureMessage "change notification not available"
UpdateMeterTotalSize m sz next -> do
liftIO $ setMeterTotalSize m sz
runner next
RunValidityCheck checkaction next -> runner . next =<< checkaction
where
transfer mk k af ta = case runst of
-- Update transfer logs when serving.
-- Using noRetry because we're the sender.
Serving theiruuid _ _ ->
mk theiruuid k af noRetry ta noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
Client _ -> ta nullMeterUpdate
storefile dest (Offset o) (Len l) getb validitycheck p ti = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
v <- runner getb
case v of
Right b -> do
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
when (o /= 0) $
hSeek h AbsoluteSeek o
meteredWrite p' h b
indicatetransferred ti
rightsize <- do
sz <- liftIO $ getFileSize (toRawFilePath dest)
return (toInteger sz == l + o)
runner validitycheck >>= \case
Right (Just Valid) ->
return (rightsize, UnVerified)
Right (Just Invalid) | l == 0 ->
-- Special case, for when
-- content was not
-- available to send,
-- which is indicated by
-- sending 0 bytes and
-- Invalid.
return (False, UnVerified)
_ -> do
-- Invalid, or old protocol
-- version. Validity is not
-- known. Force content
-- verification.
return (rightsize, MustVerify)
Left e -> error $ describeProtoFailure e
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
where
setup = liftIO $ openBinaryFile f ReadMode
cleanup = liftIO . hClose
go h = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
when (o /= 0) $
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
let validitycheck = local $ runValidityCheck $
checkchanged >>= return . \case
False -> Invalid
True -> Valid
r <- runner (sender b validitycheck)
indicatetransferred ti
return r
-- This allows using actions like download and viaTmp
-- that may abort a transfer, and clean up the protocol after them.
--
-- Runs an action that may make a transfer, passing a transfer
-- indicator. The action should call indicatetransferred on it,
-- only after it's actually sent/received the all data.
--
-- If the action ends without having called indicatetransferred,
-- runs the fallback action, which can close the protoocol
-- connection or otherwise clean up after the transfer not having
-- occurred.
--
-- If the action throws an exception, the fallback is not run.
checktransfer ta fallback = do
ti <- liftIO $ newTVarIO False
r <- ta ti
ifM (liftIO $ atomically $ readTVar ti)
( return r
, fallback
)
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True