Remote.Git retrieveKeyFile works with annex+http urls
This includes a bugfix to serveGet, it hung at the end.
This commit is contained in:
parent
a2d1844292
commit
7bd616e169
8 changed files with 67 additions and 189 deletions
|
@ -21,6 +21,7 @@ module Annex.Verify (
|
|||
finishVerifyKeyContentIncrementally,
|
||||
verifyKeyContentIncrementally,
|
||||
IncrementalVerifier(..),
|
||||
writeVerifyChunk,
|
||||
resumeVerifyFromOffset,
|
||||
tailVerify,
|
||||
) where
|
||||
|
@ -215,6 +216,12 @@ verifyKeyContentIncrementally verifyconfig k a = do
|
|||
a miv
|
||||
snd <$> finishVerifyKeyContentIncrementally miv
|
||||
|
||||
writeVerifyChunk :: Maybe IncrementalVerifier -> Handle -> S.ByteString -> IO ()
|
||||
writeVerifyChunk (Just iv) h c = do
|
||||
S.hPut h c
|
||||
updateIncrementalVerifier iv c
|
||||
writeVerifyChunk Nothing h c = S.hPut h c
|
||||
|
||||
{- Given a file handle that is open for reading (and likely also for writing),
|
||||
- and an offset, feeds the current content of the file up to the offset to
|
||||
- the IncrementalVerifier. Leaves the file seeked to the offset.
|
||||
|
|
|
@ -13,18 +13,13 @@ module Command.P2PHttp where
|
|||
|
||||
import Command
|
||||
import P2P.Http.Server
|
||||
import P2P.Http.Client
|
||||
import P2P.Http.Url
|
||||
import qualified P2P.Protocol as P2P
|
||||
import Annex.Url
|
||||
import Utility.Env
|
||||
import Utility.MonotonicClock
|
||||
|
||||
import Servant
|
||||
import qualified Network.Wai.Handler.Warp as Warp
|
||||
import qualified Network.Wai.Handler.WarpTLS as Warp
|
||||
import Servant
|
||||
import Servant.Client.Streaming
|
||||
import Control.Concurrent.STM
|
||||
import Network.Socket (PortNumber)
|
||||
import qualified Data.Map as M
|
||||
import Data.String
|
||||
|
@ -161,138 +156,3 @@ getAuthEnv = do
|
|||
case M.lookup user permmap of
|
||||
Nothing -> (auth, P2P.ServeReadWrite)
|
||||
Just perms -> (auth, perms)
|
||||
|
||||
testLocking = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
let k = B64Key (fromJust $ deserializeKey ("SHA256E-s6--5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03" :: String))
|
||||
res <- liftIO $ clientLockContent (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
k
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
case res of
|
||||
LockResult True (Just lckid) ->
|
||||
liftIO $ clientKeepLocked (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
lckid
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing $ \keeplocked -> do
|
||||
print "running, press enter to drop lock"
|
||||
_ <- getLine
|
||||
atomically $ writeTMVar keeplocked False
|
||||
_ -> liftIO $ print ("lockin failed", res)
|
||||
|
||||
testLockContent = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- liftIO $ clientLockContent (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("SHA256E-s6--5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03" :: String)))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
liftIO $ print res
|
||||
|
||||
testKeepLocked = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
liftIO $ clientKeepLocked (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64UUID (toUUID ("lck" :: String)))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing $ \keeplocked -> do
|
||||
print "running, press enter to drop lock"
|
||||
_ <- getLine
|
||||
atomically $ writeTMVar keeplocked False
|
||||
|
||||
testGet = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- liftIO $ clientGet (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("SHA256E-s1048576000--e3b67ce72aa2571c799d6419e3e36828461ac1c78f8ef300c7f9c8ae671c517f" :: String)))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
Nothing
|
||||
"outfile"
|
||||
liftIO $ print res
|
||||
|
||||
testPut = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- clientPut (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("SHA256E-s1048576000--b460ca923520db561d01b99483e9e2fe65ff9dfbdd52c17acba6ac4e874e27d5")))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
Nothing
|
||||
(AssociatedFile (Just "foo"))
|
||||
"emptyfile"
|
||||
0
|
||||
(liftIO (print "validity check") >> return False)
|
||||
liftIO $ print res
|
||||
|
||||
testPutOffset = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- liftIO $ clientPutOffset (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("SHA256E-s1048576000--b460ca923520db561d01b99483e9e2fe65ff9dfbdd52c17acba6ac4e874e27d5")))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
liftIO $ print res
|
||||
|
||||
testRemove = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- liftIO $ clientRemove (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("WORM-s30-m1720547401--foo" :: String)))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
liftIO $ print res
|
||||
|
||||
testRemoveBefore = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
MonotonicTimestamp t <- liftIO currentMonotonicTimestamp
|
||||
--liftIO $ threadDelaySeconds (Seconds 10)
|
||||
let ts = MonotonicTimestamp (t + 10)
|
||||
liftIO $ print ("running with timestamp", ts)
|
||||
res <- liftIO $ clientRemoveBefore (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey ("WORM-s30-m1720617630--bar" :: String)))
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
(Timestamp ts)
|
||||
Nothing
|
||||
liftIO $ print res
|
||||
|
||||
testGetTimestamp = do
|
||||
mgr <- httpManager <$> getUrlOptions
|
||||
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||
res <- liftIO $ clientGetTimestamp (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
[]
|
||||
Nothing
|
||||
liftIO $ print res
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import Annex.Verify
|
|||
import Control.Monad.Free
|
||||
import Control.Concurrent.STM
|
||||
import Data.Time.Clock.POSIX
|
||||
import qualified Data.ByteString as S
|
||||
|
||||
-- Full interpreter for Proto, that can receive and send objects.
|
||||
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
||||
|
@ -197,12 +196,7 @@ runLocal runst runner a = case a of
|
|||
Right b -> do
|
||||
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
|
||||
p' <- resumeVerifyFromOffset o incrementalverifier p h
|
||||
let writechunk = case incrementalverifier of
|
||||
Nothing -> \c -> S.hPut h c
|
||||
Just iv -> \c -> do
|
||||
S.hPut h c
|
||||
updateIncrementalVerifier iv c
|
||||
meteredWrite p' writechunk b
|
||||
meteredWrite p' (writeVerifyChunk incrementalverifier h) b
|
||||
indicatetransferred ti
|
||||
|
||||
rightsize <- do
|
||||
|
|
|
@ -11,7 +11,10 @@
|
|||
{-# LANGUAGE DataKinds, TypeApplications #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module P2P.Http.Client where
|
||||
module P2P.Http.Client (
|
||||
module P2P.Http.Client,
|
||||
Validity(..),
|
||||
) where
|
||||
|
||||
import Types
|
||||
import Annex.Url
|
||||
|
@ -25,7 +28,9 @@ import P2P.Http.Url
|
|||
import Annex.Common
|
||||
import P2P.Protocol hiding (Offset, Bypass, auth)
|
||||
import Annex.Concurrent
|
||||
import Annex.Verify
|
||||
import Utility.Url (BasicAuth(..))
|
||||
import Utility.Metered
|
||||
import qualified Git.Credential as Git
|
||||
|
||||
import Servant hiding (BasicAuthData(..))
|
||||
|
@ -140,42 +145,33 @@ runP2PHttpClient rmt fallback () = fallback
|
|||
|
||||
#ifdef WITH_SERVANT
|
||||
clientGet
|
||||
:: ClientEnv
|
||||
-> ProtocolVersion
|
||||
-> B64Key
|
||||
-> B64UUID ServerSide
|
||||
-> B64UUID ClientSide
|
||||
-> [B64UUID Bypass]
|
||||
-> Maybe B64FilePath
|
||||
-> Maybe Auth
|
||||
:: MeterUpdate
|
||||
-> Maybe IncrementalVerifier
|
||||
-> Key
|
||||
-> AssociatedFile
|
||||
-> RawFilePath
|
||||
-> IO Validity
|
||||
clientGet clientenv (ProtocolVersion ver) k su cu bypass af auth dest = do
|
||||
-> ClientAction Validity
|
||||
clientGet meterupdate iv k af dest clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ do
|
||||
startsz <- tryWhenExists $ getFileSize dest
|
||||
let mo = fmap (Offset . fromIntegral) startsz
|
||||
withClientM (cli k cu bypass af mo auth) clientenv $ \case
|
||||
Left err -> throwM err
|
||||
Right respheaders -> do
|
||||
b <- S.unSourceT (getResponse respheaders) gather
|
||||
liftIO $ withBinaryFile (fromRawFilePath dest) WriteMode $ \h -> do
|
||||
case startsz of
|
||||
Just startsz' | startsz' /= 0 ->
|
||||
hSeek h AbsoluteSeek startsz'
|
||||
_ -> noop
|
||||
len <- go 0 h (L.toChunks b)
|
||||
let offset = fmap (Offset . fromIntegral) startsz
|
||||
withClientM (cli (B64Key k) cu bypass baf offset auth) clientenv $ \case
|
||||
Left err -> return (Left err)
|
||||
Right respheaders ->
|
||||
withBinaryFile (fromRawFilePath dest) ReadWriteMode $ \h -> do
|
||||
meterupdate' <- case startsz of
|
||||
Just startsz' ->
|
||||
resumeVerifyFromOffset startsz' iv meterupdate h
|
||||
_ -> return meterupdate
|
||||
b <- S.unSourceT (getResponse respheaders) gather
|
||||
BytesProcessed len <- meteredWrite'
|
||||
meterupdate'
|
||||
(writeVerifyChunk iv h) b
|
||||
let DataLength dl = case lookupResponseHeader @DataLengthHeader' respheaders of
|
||||
Header hdr -> hdr
|
||||
_ -> error "missing data length header"
|
||||
if dl == len
|
||||
then return Valid
|
||||
else return Invalid
|
||||
return $ Right $
|
||||
if dl == len then Valid else Invalid
|
||||
where
|
||||
go n _ [] = return n
|
||||
go n h (b:bs) = do
|
||||
let !n' = n + fromIntegral (B.length b)
|
||||
B.hPut h b
|
||||
go n' h bs
|
||||
|
||||
cli =case ver of
|
||||
3 -> v3 su V3
|
||||
2 -> v2 su V2
|
||||
|
@ -191,6 +187,10 @@ clientGet clientenv (ProtocolVersion ver) k su cu bypass af auth dest = do
|
|||
gather' (S.Skip s) = gather' s
|
||||
gather' (S.Effect ms) = ms >>= gather'
|
||||
gather' (S.Yield v s) = LI.Chunk v <$> unsafeInterleaveIO (gather' s)
|
||||
|
||||
baf = associatedFileToB64FilePath af
|
||||
#else
|
||||
clientGet _ _ _ = ()
|
||||
#endif
|
||||
|
||||
clientCheckPresent :: Key -> ClientAction Bool
|
||||
|
|
|
@ -175,6 +175,7 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
|
|||
validity <- atomically $ takeTMVar validityv
|
||||
sz <- takeMVar szv
|
||||
atomically $ putTMVar finalv ()
|
||||
atomically $ putTMVar endv ()
|
||||
return $ case validity of
|
||||
Nothing -> True
|
||||
Just Valid -> True
|
||||
|
@ -198,11 +199,9 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
|
|||
Just (Offset o) -> fromIntegral o
|
||||
Nothing -> 0
|
||||
|
||||
getreq offset = P2P.Protocol.GET offset (ProtoAssociatedFile af) k
|
||||
getreq offset = P2P.Protocol.GET offset af k
|
||||
|
||||
af = AssociatedFile $ case baf of
|
||||
Just (B64FilePath f) -> Just f
|
||||
Nothing -> Nothing
|
||||
af = ProtoAssociatedFile $ b64FilePathToAssociatedFile baf
|
||||
|
||||
serveCheckPresent
|
||||
:: APIVersion v
|
||||
|
@ -345,9 +344,7 @@ servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf mof
|
|||
Just (Offset o) -> o
|
||||
Nothing -> 0
|
||||
|
||||
af = AssociatedFile $ case baf of
|
||||
Just (B64FilePath f) -> Just f
|
||||
Nothing -> Nothing
|
||||
af = b64FilePathToAssociatedFile baf
|
||||
|
||||
-- Streams the ByteString from the client. Avoids returning a longer
|
||||
-- than expected ByteString by truncating to the expected length.
|
||||
|
|
|
@ -50,6 +50,14 @@ newtype B64Key = B64Key Key
|
|||
newtype B64FilePath = B64FilePath RawFilePath
|
||||
deriving (Show)
|
||||
|
||||
associatedFileToB64FilePath :: AssociatedFile -> Maybe B64FilePath
|
||||
associatedFileToB64FilePath (AssociatedFile Nothing) = Nothing
|
||||
associatedFileToB64FilePath (AssociatedFile (Just f)) = Just (B64FilePath f)
|
||||
|
||||
b64FilePathToAssociatedFile :: Maybe B64FilePath -> AssociatedFile
|
||||
b64FilePathToAssociatedFile Nothing = AssociatedFile Nothing
|
||||
b64FilePathToAssociatedFile (Just (B64FilePath f)) = AssociatedFile (Just f)
|
||||
|
||||
newtype B64UUID t = B64UUID { fromB64UUID :: UUID }
|
||||
deriving (Show, Ord, Eq, Generic, NFData)
|
||||
|
||||
|
|
|
@ -538,7 +538,12 @@ copyFromRemote r st key file dest meterupdate vc = do
|
|||
copyFromRemote'' repo r st key file dest meterupdate vc
|
||||
|
||||
copyFromRemote'' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
copyFromRemote'' repo r st@(State connpool _ _ _ _) key file dest meterupdate vc
|
||||
copyFromRemote'' repo r st@(State connpool _ _ _ _) key af dest meterupdate vc
|
||||
| isP2PHttp r = verifyKeyContentIncrementally vc key $ \iv ->
|
||||
metered (Just meterupdate) key bwlimit $ \_ p ->
|
||||
p2pHttpClient r giveup (clientGet p iv key af (encodeBS dest)) >>= \case
|
||||
Valid -> return ()
|
||||
Invalid -> giveup "Transfer failed"
|
||||
| Git.repoIsHttp repo = verifyKeyContentIncrementally vc key $ \iv -> do
|
||||
gc <- Annex.getGitConfig
|
||||
ok <- Url.withUrlOptionsPromptingCreds $
|
||||
|
@ -556,7 +561,7 @@ copyFromRemote'' repo r st@(State connpool _ _ _ _) key file dest meterupdate vc
|
|||
Nothing -> return True
|
||||
copier <- mkFileCopier hardlink st
|
||||
(ok, v) <- runTransfer (Transfer Download u (fromKey id key))
|
||||
Nothing file Nothing stdRetry $ \p ->
|
||||
Nothing af Nothing stdRetry $ \p ->
|
||||
metered (Just (combineMeterUpdate p meterupdate)) key bwlimit $ \_ p' ->
|
||||
copier object dest key p' checksuccess vc
|
||||
if ok
|
||||
|
@ -567,7 +572,7 @@ copyFromRemote'' repo r st@(State connpool _ _ _ _) key file dest meterupdate vc
|
|||
P2PHelper.retrieve
|
||||
(gitconfig r)
|
||||
(Ssh.runProto r connpool (return (False, UnVerified)))
|
||||
key file dest meterupdate vc
|
||||
key af dest meterupdate vc
|
||||
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
||||
where
|
||||
bwlimit = remoteAnnexBwLimitDownload (gitconfig r)
|
||||
|
|
|
@ -28,6 +28,13 @@ Planned schedule of work:
|
|||
|
||||
## work notes
|
||||
|
||||
* Test resume of download of large file when large amount of file is
|
||||
already downloaded and verification takes a long time. Will the http
|
||||
connection be dropped due to inactivity? May need to do verification in a
|
||||
separate thread that feeds in the existing file followed by the newly
|
||||
downloaded data. Eg, a version of tailVerify that operates on a handle
|
||||
open for read+write.
|
||||
|
||||
* Rest of Remote.Git needs implementing.
|
||||
|
||||
* git-annex p2phttp serving .well-known for ACME.
|
||||
|
|
Loading…
Reference in a new issue