S3: convert to aws for store, remove, checkPresent
Fixes the memory leak on store.. the second oldest open git-annex bug! Only retrieve remains to be converted. This commit was sponsored by Scott Robinson.
This commit is contained in:
parent
8eac9eab03
commit
4f007ace87
2 changed files with 69 additions and 65 deletions
|
@ -24,14 +24,18 @@ import Control.Concurrent
|
||||||
-- Implemented as a fileStorer, so that the content can be streamed
|
-- Implemented as a fileStorer, so that the content can be streamed
|
||||||
-- from the file in constant space.
|
-- from the file in constant space.
|
||||||
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
|
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
|
||||||
httpStorer a = fileStorer $ \k f m -> do
|
httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
|
||||||
size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer)
|
|
||||||
let streamer sink = withMeteredFile f m $ \b -> do
|
-- Reads the file and generates a streaming request body, that will update
|
||||||
|
-- the meter as it's sent.
|
||||||
|
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
|
||||||
|
httpBodyStorer src m = do
|
||||||
|
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
|
||||||
|
let streamer sink = withMeteredFile src m $ \b -> do
|
||||||
mvar <- newMVar $ L.toChunks b
|
mvar <- newMVar $ L.toChunks b
|
||||||
let getnextchunk = modifyMVar mvar $ pure . pop
|
let getnextchunk = modifyMVar mvar $ pure . pop
|
||||||
sink getnextchunk
|
sink getnextchunk
|
||||||
let body = RequestBodyStream (fromInteger size) streamer
|
return $ RequestBodyStream (fromInteger size) streamer
|
||||||
a k body
|
|
||||||
where
|
where
|
||||||
pop [] = ([], S.empty)
|
pop [] = ([], S.empty)
|
||||||
pop (c:cs) = (cs, c)
|
pop (c:cs) = (cs, c)
|
||||||
|
|
120
Remote/S3.hs
120
Remote/S3.hs
|
@ -34,9 +34,9 @@ import qualified Git
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Special
|
||||||
|
import Remote.Helper.Http
|
||||||
import qualified Remote.Helper.AWS as AWS
|
import qualified Remote.Helper.AWS as AWS
|
||||||
import Creds
|
import Creds
|
||||||
import Utility.Metered
|
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
import Logs.Web
|
import Logs.Web
|
||||||
|
|
||||||
|
@ -54,10 +54,10 @@ gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remot
|
||||||
gen r u c gc = new <$> remoteCost gc expensiveRemoteCost
|
gen r u c gc = new <$> remoteCost gc expensiveRemoteCost
|
||||||
where
|
where
|
||||||
new cst = Just $ specialRemote c
|
new cst = Just $ specialRemote c
|
||||||
(prepareStore this)
|
(prepareS3 this $ store this)
|
||||||
(prepareRetrieve this)
|
(prepareRetrieve this)
|
||||||
(simplyPrepare $ remove this c)
|
(prepareS3 this $ remove this)
|
||||||
(simplyPrepare $ checkKey this)
|
(prepareS3 this $ checkKey this)
|
||||||
this
|
this
|
||||||
where
|
where
|
||||||
this = Remote {
|
this = Remote {
|
||||||
|
@ -132,32 +132,22 @@ s3Setup' u c = if isIA c then archiveorg else defaulthost
|
||||||
writeUUIDFile archiveconfig u
|
writeUUIDFile archiveconfig u
|
||||||
use archiveconfig
|
use archiveconfig
|
||||||
|
|
||||||
prepareStore :: Remote -> Preparer Storer
|
-- Sets up a http connection manager for S3 encdpoint, which allows
|
||||||
prepareStore r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
|
-- http connections to be reused across calls to the helper.
|
||||||
fileStorer $ \k src p -> do
|
prepareS3 :: Remote -> (S3Handle -> helper) -> Preparer helper
|
||||||
ok <- s3Bool =<< liftIO (store (conn, bucket) r k p src)
|
prepareS3 r = resourcePrepare $ const $ withS3Handle (config r) (uuid r)
|
||||||
|
|
||||||
-- Store public URL to item in Internet Archive.
|
store :: Remote -> S3Handle -> Storer
|
||||||
when (ok && isIA (config r) && not (isChunkKey k)) $
|
store r h = fileStorer $ \k f p -> do
|
||||||
setUrlPresent k (iaKeyUrl r k)
|
rbody <- liftIO $ httpBodyStorer f p
|
||||||
|
void $ sendS3Handle h $
|
||||||
return ok
|
S3.putObject (hBucket h) (hBucketObject h k) rbody
|
||||||
|
|
||||||
store :: (AWSConnection, BucketName) -> Remote -> Key -> MeterUpdate -> FilePath -> IO (AWSResult ())
|
-- Store public URL to item in Internet Archive.
|
||||||
store (conn, bucket) r k p file = do
|
when (hIsIA h && not (isChunkKey k)) $
|
||||||
error "TODO"
|
setUrlPresent k (iaKeyUrl r k)
|
||||||
{-
|
|
||||||
size <- (fromIntegral . fileSize <$> getFileStatus file) :: IO Integer
|
return True
|
||||||
withMeteredFile file p $ \content -> do
|
|
||||||
-- size is provided to S3 so the whole content
|
|
||||||
-- does not need to be buffered to calculate it
|
|
||||||
let object = S3Object
|
|
||||||
bucket (bucketFile r k) ""
|
|
||||||
(("Content-Length", show size) : getXheaders (config r))
|
|
||||||
content
|
|
||||||
sendObject conn $
|
|
||||||
setStorageClass (getStorageClass $ config r) object
|
|
||||||
-}
|
|
||||||
|
|
||||||
prepareRetrieve :: Remote -> Preparer Retriever
|
prepareRetrieve :: Remote -> Preparer Retriever
|
||||||
prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
|
prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
|
||||||
|
@ -174,31 +164,37 @@ retrieveCheap _ _ = return False
|
||||||
{- Internet Archive doesn't easily allow removing content.
|
{- Internet Archive doesn't easily allow removing content.
|
||||||
- While it may remove the file, there are generally other files
|
- While it may remove the file, there are generally other files
|
||||||
- derived from it that it does not remove. -}
|
- derived from it that it does not remove. -}
|
||||||
remove :: Remote -> RemoteConfig -> Remover
|
remove :: Remote -> S3Handle -> Remover
|
||||||
remove r c k
|
remove r h k
|
||||||
| isIA c = do
|
| hIsIA h = do
|
||||||
warning "Cannot remove content from the Internet Archive"
|
warning "Cannot remove content from the Internet Archive"
|
||||||
return False
|
return False
|
||||||
| otherwise = remove' r k
|
| otherwise = do
|
||||||
|
res <- tryNonAsync $ sendS3Handle h $
|
||||||
|
S3.DeleteObject (hBucketObject h k) (hBucket h)
|
||||||
|
return $ either (const False) (const True) res
|
||||||
|
|
||||||
remove' :: Remote -> Key -> Annex Bool
|
checkKey :: Remote -> S3Handle -> CheckPresent
|
||||||
remove' r k = s3Action r False $ \(conn, bucket) ->
|
checkKey r h k = do
|
||||||
s3Bool =<< liftIO (deleteObject conn $ bucketKey r bucket k)
|
|
||||||
|
|
||||||
checkKey :: Remote -> CheckPresent
|
|
||||||
checkKey r k = s3Action r noconn $ \(conn, bucket) -> do
|
|
||||||
showAction $ "checking " ++ name r
|
showAction $ "checking " ++ name r
|
||||||
{-
|
catchMissingException $ do
|
||||||
res <- liftIO $ getObjectInfo conn $ bucketKey r bucket k
|
void $ sendS3Handle h $
|
||||||
case res of
|
S3.headObject (hBucket h) (hBucketObject h k)
|
||||||
Right _ -> return True
|
return True
|
||||||
Left (AWSError _ _) -> return False
|
|
||||||
Left e -> s3Error e
|
{- Catch exception headObject returns when an object is not present
|
||||||
-}
|
- in the bucket, and returns False. All other exceptions indicate a
|
||||||
error "TODO"
|
- check error and are let through. -}
|
||||||
|
catchMissingException :: Annex Bool -> Annex Bool
|
||||||
|
catchMissingException a = catchJust missing a (const $ return False)
|
||||||
where
|
where
|
||||||
noconn = error "S3 not configured"
|
-- This is not very good; see
|
||||||
|
-- https://github.com/aristidb/aws/issues/121
|
||||||
|
missing :: AWS.HeaderException -> Maybe ()
|
||||||
|
missing e
|
||||||
|
| AWS.headerErrorMessage e == "ETag missing" = Just ()
|
||||||
|
| otherwise = Nothing
|
||||||
|
|
||||||
s3Warning :: ReqError -> Annex Bool
|
s3Warning :: ReqError -> Annex Bool
|
||||||
s3Warning e = do
|
s3Warning e = do
|
||||||
warning $ prettyReqError e
|
warning $ prettyReqError e
|
||||||
|
@ -216,16 +212,8 @@ s3Action r noconn action = do
|
||||||
(Just b, Just c) -> action (c, b)
|
(Just b, Just c) -> action (c, b)
|
||||||
_ -> return noconn
|
_ -> return noconn
|
||||||
|
|
||||||
bucketFile :: Remote -> Key -> FilePath
|
|
||||||
bucketFile r = munge . key2file
|
|
||||||
where
|
|
||||||
munge s = case M.lookup "mungekeys" c of
|
|
||||||
Just "ia" -> iaMunge $ getFilePrefix c ++ s
|
|
||||||
_ -> getFilePrefix c ++ s
|
|
||||||
c = config r
|
|
||||||
|
|
||||||
bucketKey :: Remote -> BucketName -> Key -> S3Object
|
bucketKey :: Remote -> BucketName -> Key -> S3Object
|
||||||
bucketKey r bucket k = S3Object bucket (bucketFile r k) "" [] L.empty
|
bucketKey r bucket k = S3Object bucket (bucketObject (config r) k) "" [] L.empty
|
||||||
|
|
||||||
{- Generate the bucket if it does not already exist, including creating the
|
{- Generate the bucket if it does not already exist, including creating the
|
||||||
- UUID file within the bucket.
|
- UUID file within the bucket.
|
||||||
|
@ -313,8 +301,12 @@ data S3Handle = S3Handle
|
||||||
{ hmanager :: Manager
|
{ hmanager :: Manager
|
||||||
, hawscfg :: AWS.Configuration
|
, hawscfg :: AWS.Configuration
|
||||||
, hs3cfg :: S3.S3Configuration AWS.NormalQuery
|
, hs3cfg :: S3.S3Configuration AWS.NormalQuery
|
||||||
|
|
||||||
|
-- Cached values.
|
||||||
, hBucket :: S3.Bucket
|
, hBucket :: S3.Bucket
|
||||||
, hStorageClass :: S3.StorageClass
|
, hStorageClass :: S3.StorageClass
|
||||||
|
, hBucketObject :: Key -> S3.Bucket
|
||||||
|
, hIsIA :: Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
{- Sends a request to S3 and gets back the response.
|
{- Sends a request to S3 and gets back the response.
|
||||||
|
@ -339,12 +331,13 @@ withS3Handle c u a = do
|
||||||
bucket <- maybe nobucket (return . T.pack) (getBucketName c)
|
bucket <- maybe nobucket (return . T.pack) (getBucketName c)
|
||||||
let awscfg = AWS.Configuration AWS.Timestamp awscreds (AWS.defaultLog AWS.Error)
|
let awscfg = AWS.Configuration AWS.Timestamp awscreds (AWS.defaultLog AWS.Error)
|
||||||
bracketIO (newManager httpcfg) closeManager $ \mgr ->
|
bracketIO (newManager httpcfg) closeManager $ \mgr ->
|
||||||
a $ S3Handle mgr awscfg s3cfg bucket sc
|
a $ S3Handle mgr awscfg s3cfg bucket sc bo (isIA c)
|
||||||
where
|
where
|
||||||
s3cfg = s3Configuration c
|
s3cfg = s3Configuration c
|
||||||
httpcfg = defaultManagerSettings
|
httpcfg = defaultManagerSettings
|
||||||
{ managerResponseTimeout = Nothing }
|
{ managerResponseTimeout = Nothing }
|
||||||
sc = getStorageClass c
|
sc = getStorageClass c
|
||||||
|
bo = T.pack . bucketObject c -- memoized
|
||||||
nocreds = error "Cannot use S3 without credentials configured"
|
nocreds = error "Cannot use S3 without credentials configured"
|
||||||
nobucket = error "S3 bucket not configured"
|
nobucket = error "S3 bucket not configured"
|
||||||
|
|
||||||
|
@ -390,6 +383,13 @@ getXheaders = filter isxheader . M.assocs
|
||||||
getFilePrefix :: RemoteConfig -> String
|
getFilePrefix :: RemoteConfig -> String
|
||||||
getFilePrefix = M.findWithDefault "" "fileprefix"
|
getFilePrefix = M.findWithDefault "" "fileprefix"
|
||||||
|
|
||||||
|
bucketObject :: RemoteConfig -> Key -> FilePath
|
||||||
|
bucketObject c = munge . key2file
|
||||||
|
where
|
||||||
|
munge s = case M.lookup "mungekeys" c of
|
||||||
|
Just "ia" -> iaMunge $ getFilePrefix c ++ s
|
||||||
|
_ -> getFilePrefix c ++ s
|
||||||
|
|
||||||
{- Internet Archive limits filenames to a subset of ascii,
|
{- Internet Archive limits filenames to a subset of ascii,
|
||||||
- with no whitespace. Other characters are xml entity
|
- with no whitespace. Other characters are xml entity
|
||||||
- encoded. -}
|
- encoded. -}
|
||||||
|
@ -416,6 +416,6 @@ iaItemUrl :: BucketName -> URLString
|
||||||
iaItemUrl bucket = "http://archive.org/details/" ++ bucket
|
iaItemUrl bucket = "http://archive.org/details/" ++ bucket
|
||||||
|
|
||||||
iaKeyUrl :: Remote -> Key -> URLString
|
iaKeyUrl :: Remote -> Key -> URLString
|
||||||
iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketFile r k
|
iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketObject (config r) k
|
||||||
where
|
where
|
||||||
bucket = fromMaybe "" $ getBucketName $ config r
|
bucket = fromMaybe "" $ getBucketName $ config r
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue