Merge branch 's3-aws'
This commit is contained in:
commit
911ba8d972
12 changed files with 493 additions and 225 deletions
|
@ -1,6 +1,6 @@
|
|||
{- Amazon Web Services common infrastructure.
|
||||
-
|
||||
- Copyright 2011,2012 Joey Hess <joey@kitenet.net>
|
||||
- Copyright 2011-2014 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
@ -12,8 +12,14 @@ module Remote.Helper.AWS where
|
|||
import Common.Annex
|
||||
import Creds
|
||||
|
||||
import qualified Aws
|
||||
import qualified Aws.S3 as S3
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Text (Text)
|
||||
import Data.IORef
|
||||
|
||||
creds :: UUID -> CredPairStorage
|
||||
creds u = CredPairStorage
|
||||
|
@ -22,6 +28,13 @@ creds u = CredPairStorage
|
|||
, credPairRemoteKey = Just "s3creds"
|
||||
}
|
||||
|
||||
genCredentials :: CredPair -> IO Aws.Credentials
|
||||
genCredentials (keyid, secret) = Aws.Credentials
|
||||
<$> pure (encodeUtf8 (T.pack keyid))
|
||||
<*> pure (encodeUtf8 (T.pack secret))
|
||||
<*> newIORef []
|
||||
<*> pure Nothing
|
||||
|
||||
data Service = S3 | Glacier
|
||||
deriving (Eq)
|
||||
|
||||
|
@ -33,9 +46,10 @@ regionMap = M.fromList . regionInfo
|
|||
defaultRegion :: Service -> Region
|
||||
defaultRegion = snd . Prelude.head . regionInfo
|
||||
|
||||
{- S3 and Glacier use different names for some regions. Ie, "us-east-1"
|
||||
- cannot be used with S3, while "US" cannot be used with Glacier. Dunno why.
|
||||
- Also, Glacier is not yet available in all regions. -}
|
||||
data ServiceRegion = BothRegion Region | S3Region Region | GlacierRegion Region
|
||||
|
||||
{- The "US" and "EU" names are used as location constraints when creating a
|
||||
- S3 bucket. -}
|
||||
regionInfo :: Service -> [(Text, Region)]
|
||||
regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
|
||||
filter (matchingService . snd) $
|
||||
|
@ -45,9 +59,7 @@ regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
|
|||
[ ("US East (N. Virginia)", [S3Region "US", GlacierRegion "us-east-1"])
|
||||
, ("US West (Oregon)", [BothRegion "us-west-2"])
|
||||
, ("US West (N. California)", [BothRegion "us-west-1"])
|
||||
-- Requires AWS4-HMAC-SHA256 which S3 library does not
|
||||
-- currently support.
|
||||
-- , ("EU (Frankfurt)", [BothRegion "eu-central-1"])
|
||||
, ("EU (Frankfurt)", [BothRegion "eu-central-1"])
|
||||
, ("EU (Ireland)", [S3Region "EU", GlacierRegion "eu-west-1"])
|
||||
, ("Asia Pacific (Singapore)", [S3Region "ap-southeast-1"])
|
||||
, ("Asia Pacific (Tokyo)", [BothRegion "ap-northeast-1"])
|
||||
|
@ -63,4 +75,14 @@ regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
|
|||
matchingService (S3Region _) = service == S3
|
||||
matchingService (GlacierRegion _) = service == Glacier
|
||||
|
||||
data ServiceRegion = BothRegion Region | S3Region Region | GlacierRegion Region
|
||||
s3HostName :: Region -> B.ByteString
|
||||
s3HostName "US" = "s3.amazonaws.com"
|
||||
s3HostName "EU" = "s3-eu-west-1.amazonaws.com"
|
||||
s3HostName r = encodeUtf8 $ T.concat ["s3-", r, ".amazonaws.com"]
|
||||
|
||||
s3DefaultHost :: String
|
||||
s3DefaultHost = "s3.amazonaws.com"
|
||||
|
||||
mkLocationConstraint :: Region -> S3.LocationConstraint
|
||||
mkLocationConstraint "US" = S3.locationUsClassic
|
||||
mkLocationConstraint r = r
|
||||
|
|
|
@ -5,13 +5,15 @@
|
|||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Remote.Helper.Http where
|
||||
|
||||
import Common.Annex
|
||||
import Types.StoreRetrieve
|
||||
import Utility.Metered
|
||||
import Remote.Helper.Special
|
||||
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader)
|
||||
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
|
||||
import Network.HTTP.Types
|
||||
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
@ -24,17 +26,45 @@ import Control.Concurrent
|
|||
-- Implemented as a fileStorer, so that the content can be streamed
|
||||
-- from the file in constant space.
|
||||
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
|
||||
httpStorer a = fileStorer $ \k f m -> do
|
||||
size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer)
|
||||
let streamer sink = withMeteredFile f m $ \b -> do
|
||||
mvar <- newMVar $ L.toChunks b
|
||||
let getnextchunk = modifyMVar mvar $ pure . pop
|
||||
sink getnextchunk
|
||||
let body = RequestBodyStream (fromInteger size) streamer
|
||||
a k body
|
||||
httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
|
||||
|
||||
-- Reads the file and generates a streaming request body, that will update
|
||||
-- the meter as it's sent.
|
||||
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
|
||||
httpBodyStorer src m = do
|
||||
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
|
||||
let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
|
||||
return $ RequestBodyStream (fromInteger size) streamer
|
||||
|
||||
byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
|
||||
byteStringPopper b sink = do
|
||||
mvar <- newMVar $ L.toChunks b
|
||||
let getnextchunk = modifyMVar mvar $ \v ->
|
||||
case v of
|
||||
[] -> return ([], S.empty)
|
||||
(c:cs) -> return (cs, c)
|
||||
sink getnextchunk
|
||||
|
||||
{- Makes a Popper that streams a given number of chunks of a given
|
||||
- size from the handle, updating the meter as the chunks are read. -}
|
||||
handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
|
||||
handlePopper numchunks chunksize meterupdate h sink = do
|
||||
mvar <- newMVar zeroBytesProcessed
|
||||
let getnextchunk = do
|
||||
sent <- takeMVar mvar
|
||||
if sent >= target
|
||||
then do
|
||||
putMVar mvar sent
|
||||
return S.empty
|
||||
else do
|
||||
b <- S.hGet h chunksize
|
||||
let !sent' = addBytesProcessed sent chunksize
|
||||
putMVar mvar sent'
|
||||
meterupdate sent'
|
||||
return b
|
||||
sink getnextchunk
|
||||
where
|
||||
pop [] = ([], S.empty)
|
||||
pop (c:cs) = (cs, c)
|
||||
target = toBytesProcessed (numchunks * fromIntegral chunksize)
|
||||
|
||||
-- Reads the http body and stores it to the specified file, updating the
|
||||
-- meter as it goes.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue