better byteRetriever
Make the byteRetriever be passed the callback that consumes the bytestring. This way, there's no worries about the lazy bytestring not all being read when the resource that's creating it is closed. Which in turn lets bup, ddar, and S3 each switch from using an unncessary fileRetriver to a byteRetriever. So, more efficient on chunks and encrypted files. The only remaining fileRetrievers are hook and external, which really do retrieve to files.
This commit is contained in:
parent
19b71cfb8f
commit
d05b7b9182
8 changed files with 44 additions and 42 deletions
|
@ -8,6 +8,7 @@
|
||||||
module Remote.Bup (remote) where
|
module Remote.Bup (remote) where
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
import Data.ByteString.Lazy.UTF8 (fromString)
|
import Data.ByteString.Lazy.UTF8 (fromString)
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
@ -127,12 +128,12 @@ store r buprepo = byteStorer $ \k b p -> do
|
||||||
return True
|
return True
|
||||||
|
|
||||||
retrieve :: BupRepo -> Retriever
|
retrieve :: BupRepo -> Retriever
|
||||||
retrieve buprepo = fileRetriever $ \d k _p ->
|
retrieve buprepo = byteRetriever $ \k sink -> do
|
||||||
liftIO $ withFile d WriteMode $ \h -> do
|
|
||||||
let params = bupParams "join" buprepo [Param $ bupRef k]
|
let params = bupParams "join" buprepo [Param $ bupRef k]
|
||||||
let p = proc "bup" (toCommand params)
|
let p = proc "bup" (toCommand params)
|
||||||
(_, _, _, pid) <- createProcess $ p { std_out = UseHandle h }
|
(_, Just h, _, pid) <- liftIO $ createProcess $ p { std_out = CreatePipe }
|
||||||
forceSuccessProcess p pid
|
liftIO (hClose h >> forceSuccessProcess p pid)
|
||||||
|
`after` (sink =<< liftIO (L.hGetContents h))
|
||||||
|
|
||||||
retrieveCheap :: BupRepo -> Key -> FilePath -> Annex Bool
|
retrieveCheap :: BupRepo -> Key -> FilePath -> Annex Bool
|
||||||
retrieveCheap _ _ _ = return False
|
retrieveCheap _ _ _ = return False
|
||||||
|
|
|
@ -10,8 +10,8 @@ module Remote.Ddar (remote) where
|
||||||
|
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
import System.IO.Error
|
import System.IO.Error
|
||||||
import System.Process
|
|
||||||
|
|
||||||
import Data.String.Utils
|
import Data.String.Utils
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
@ -127,12 +127,12 @@ ddarExtractRemoteCall ddarrepo k =
|
||||||
ddarRemoteCall ddarrepo 'x' [Param "--force-stdout", Param $ key2file k]
|
ddarRemoteCall ddarrepo 'x' [Param "--force-stdout", Param $ key2file k]
|
||||||
|
|
||||||
retrieve :: DdarRepo -> Retriever
|
retrieve :: DdarRepo -> Retriever
|
||||||
retrieve ddarrepo = fileRetriever $ \d k _p -> do
|
retrieve ddarrepo = byteRetriever $ \k sink -> do
|
||||||
(cmd, params) <- ddarExtractRemoteCall ddarrepo k
|
(cmd, params) <- ddarExtractRemoteCall ddarrepo k
|
||||||
liftIO $ withFile d WriteMode $ \h -> do
|
let p = (proc cmd $ toCommand params) { std_out = CreatePipe }
|
||||||
let p = (proc cmd $ toCommand params){ std_out = UseHandle h }
|
(_, Just h, _, pid) <- liftIO $ createProcess p
|
||||||
(_, _, _, pid) <- Common.Annex.createProcess p
|
liftIO (hClose h >> forceSuccessProcess p pid)
|
||||||
forceSuccessProcess p pid
|
`after` (sink =<< liftIO (L.hGetContents h))
|
||||||
|
|
||||||
retrieveCheap :: Key -> FilePath -> Annex Bool
|
retrieveCheap :: Key -> FilePath -> Annex Bool
|
||||||
retrieveCheap _ _ = return False
|
retrieveCheap _ _ = return False
|
||||||
|
|
|
@ -136,8 +136,8 @@ store d chunkconfig k b p = liftIO $ do
|
||||||
|
|
||||||
retrieve :: FilePath -> ChunkConfig -> Preparer Retriever
|
retrieve :: FilePath -> ChunkConfig -> Preparer Retriever
|
||||||
retrieve d (LegacyChunks _) = Legacy.retrieve locations d
|
retrieve d (LegacyChunks _) = Legacy.retrieve locations d
|
||||||
retrieve d _ = simplyPrepare $ byteRetriever $ \k ->
|
retrieve d _ = simplyPrepare $ byteRetriever $ \k sink ->
|
||||||
liftIO $ L.readFile =<< getLocation d k
|
sink =<< liftIO (L.readFile =<< getLocation d k)
|
||||||
|
|
||||||
retrieveCheap :: FilePath -> ChunkConfig -> Key -> FilePath -> Annex Bool
|
retrieveCheap :: FilePath -> ChunkConfig -> Key -> FilePath -> Annex Bool
|
||||||
-- no cheap retrieval possible for chunks
|
-- no cheap retrieval possible for chunks
|
||||||
|
|
|
@ -94,14 +94,14 @@ retrieve locations d basek a = do
|
||||||
tmpdir <- fromRepo $ gitAnnexTmpMiscDir
|
tmpdir <- fromRepo $ gitAnnexTmpMiscDir
|
||||||
createAnnexDirectory tmpdir
|
createAnnexDirectory tmpdir
|
||||||
let tmp = tmpdir </> keyFile basek ++ ".directorylegacy.tmp"
|
let tmp = tmpdir </> keyFile basek ++ ".directorylegacy.tmp"
|
||||||
a $ Just $ byteRetriever $ \k -> liftIO $ do
|
a $ Just $ byteRetriever $ \k sink -> do
|
||||||
void $ withStoredFiles d locations k $ \fs -> do
|
liftIO $ void $ withStoredFiles d locations k $ \fs -> do
|
||||||
forM_ fs $
|
forM_ fs $
|
||||||
S.appendFile tmp <=< S.readFile
|
S.appendFile tmp <=< S.readFile
|
||||||
return True
|
return True
|
||||||
b <- L.readFile tmp
|
b <- liftIO $ L.readFile tmp
|
||||||
nukeFile tmp
|
liftIO $ nukeFile tmp
|
||||||
return b
|
sink b
|
||||||
|
|
||||||
checkPresent :: FilePath -> (FilePath -> Key -> [FilePath]) -> Key -> Annex (Either String Bool)
|
checkPresent :: FilePath -> (FilePath -> Key -> [FilePath]) -> Key -> Annex (Either String Bool)
|
||||||
checkPresent d locations k = liftIO $ catchMsgIO $
|
checkPresent d locations k = liftIO $ catchMsgIO $
|
||||||
|
|
|
@ -20,7 +20,6 @@ import Config.Cost
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Special
|
||||||
import Remote.Helper.ChunkedEncryptable
|
import Remote.Helper.ChunkedEncryptable
|
||||||
import qualified Remote.Helper.AWS as AWS
|
import qualified Remote.Helper.AWS as AWS
|
||||||
import Crypto
|
|
||||||
import Creds
|
import Creds
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
|
@ -120,11 +119,10 @@ store r k b p = go =<< glacierEnv c u
|
||||||
return True
|
return True
|
||||||
|
|
||||||
prepareRetrieve :: Remote -> Preparer Retriever
|
prepareRetrieve :: Remote -> Preparer Retriever
|
||||||
prepareRetrieve r = simplyPrepare $ fileRetriever $ \d k p ->
|
prepareRetrieve = simplyPrepare . byteRetriever . retrieve
|
||||||
retrieve r k (readBytes (meteredWriteFile p d))
|
|
||||||
|
|
||||||
retrieve :: Remote -> Key -> (Handle -> IO ()) -> Annex ()
|
retrieve :: Remote -> Key -> (L.ByteString -> Annex Bool) -> Annex Bool
|
||||||
retrieve r k reader = go =<< glacierEnv c u
|
retrieve r k sink = go =<< glacierEnv c u
|
||||||
where
|
where
|
||||||
c = config r
|
c = config r
|
||||||
u = uuid r
|
u = uuid r
|
||||||
|
@ -138,17 +136,21 @@ retrieve r k reader = go =<< glacierEnv c u
|
||||||
go Nothing = error "cannot retrieve from glacier"
|
go Nothing = error "cannot retrieve from glacier"
|
||||||
go (Just e) = do
|
go (Just e) = do
|
||||||
let cmd = (proc "glacier" (toCommand params)) { env = Just e }
|
let cmd = (proc "glacier" (toCommand params)) { env = Just e }
|
||||||
ok <- liftIO $ catchBoolIO $
|
(_, Just h, _, pid) <- liftIO $ createProcess cmd
|
||||||
withHandle StdoutHandle createProcessSuccess cmd $ \h ->
|
-- Glacier cannot store empty files, so if the output is
|
||||||
ifM (hIsEOF h)
|
-- empty, the content is not available yet.
|
||||||
|
ok <- ifM (liftIO $ hIsEOF h)
|
||||||
( return False
|
( return False
|
||||||
, do
|
, sink =<< liftIO (L.hGetContents h)
|
||||||
reader h
|
|
||||||
return True
|
|
||||||
)
|
)
|
||||||
|
liftIO $ hClose h
|
||||||
|
liftIO $ forceSuccessProcess cmd pid
|
||||||
unless ok $ do
|
unless ok $ do
|
||||||
showLongNote "Recommend you wait up to 4 hours, and then run this command again."
|
showLongNote "Recommend you wait up to 4 hours, and then run this command again."
|
||||||
error "not yet available"
|
return ok
|
||||||
|
|
||||||
|
retrieveCheap :: Remote -> Key -> FilePath -> Annex Bool
|
||||||
|
retrieveCheap _ _ _ = return False
|
||||||
|
|
||||||
remove :: Remote -> Key -> Annex Bool
|
remove :: Remote -> Key -> Annex Bool
|
||||||
remove r k = glacierAction r
|
remove r k = glacierAction r
|
||||||
|
@ -159,9 +161,6 @@ remove r k = glacierAction r
|
||||||
, Param $ archive r k
|
, Param $ archive r k
|
||||||
]
|
]
|
||||||
|
|
||||||
retrieveCheap :: Remote -> Key -> FilePath -> Annex Bool
|
|
||||||
retrieveCheap _ _ _ = return False
|
|
||||||
|
|
||||||
checkPresent :: Remote -> Key -> Annex (Either String Bool)
|
checkPresent :: Remote -> Key -> Annex (Either String Bool)
|
||||||
checkPresent r k = do
|
checkPresent r k = do
|
||||||
showAction $ "checking " ++ name r
|
showAction $ "checking " ++ name r
|
||||||
|
|
|
@ -77,9 +77,11 @@ fileRetriever a k m callback = do
|
||||||
a f k m
|
a f k m
|
||||||
callback (FileContent f)
|
callback (FileContent f)
|
||||||
|
|
||||||
-- A Retriever that generates a L.ByteString containing the Key's content.
|
-- A Retriever that generates a lazy ByteString containing the Key's
|
||||||
byteRetriever :: (Key -> Annex L.ByteString) -> Retriever
|
-- content, and passes it to a callback action which will fully consume it
|
||||||
byteRetriever a k _m callback = callback =<< (ByteContent <$> a k)
|
-- before returning.
|
||||||
|
byteRetriever :: (Key -> (L.ByteString -> Annex Bool) -> Annex Bool) -> Retriever
|
||||||
|
byteRetriever a k _m callback = a k (callback . ByteContent)
|
||||||
|
|
||||||
{- The base Remote that is provided to chunkedEncryptableRemote
|
{- The base Remote that is provided to chunkedEncryptableRemote
|
||||||
- needs to have storeKey and retreiveKeyFile methods, but they are
|
- needs to have storeKey and retreiveKeyFile methods, but they are
|
||||||
|
|
|
@ -147,9 +147,9 @@ store (conn, bucket) r k p file = do
|
||||||
|
|
||||||
prepareRetrieve :: Remote -> Preparer Retriever
|
prepareRetrieve :: Remote -> Preparer Retriever
|
||||||
prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
|
prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
|
||||||
byteRetriever $ \k ->
|
byteRetriever $ \k sink ->
|
||||||
liftIO (getObject conn $ bucketKey r bucket k)
|
liftIO (getObject conn $ bucketKey r bucket k)
|
||||||
>>= either s3Error (return . obj_data)
|
>>= either s3Error (sink . obj_data)
|
||||||
|
|
||||||
retrieveCheap :: Remote -> Key -> FilePath -> Annex Bool
|
retrieveCheap :: Remote -> Key -> FilePath -> Annex Bool
|
||||||
retrieveCheap _ _ _ = return False
|
retrieveCheap _ _ _ = return False
|
||||||
|
|
|
@ -30,6 +30,6 @@ isByteContent (FileContent _) = False
|
||||||
type Storer = Key -> ContentSource -> MeterUpdate -> Annex Bool
|
type Storer = Key -> ContentSource -> MeterUpdate -> Annex Bool
|
||||||
|
|
||||||
-- Action that retrieves a Key's content from a remote, passing it to a
|
-- Action that retrieves a Key's content from a remote, passing it to a
|
||||||
-- callback.
|
-- callback, which will fully consume the content before returning.
|
||||||
-- Throws exception if key is not present, or remote is not accessible.
|
-- Throws exception if key is not present, or remote is not accessible.
|
||||||
type Retriever = Key -> MeterUpdate -> (ContentSource -> Annex Bool) -> Annex Bool
|
type Retriever = Key -> MeterUpdate -> (ContentSource -> Annex Bool) -> Annex Bool
|
||||||
|
|
Loading…
Reference in a new issue