MapReduce、ただし、サンプルの通りでは動かない。
Control.Parallel.Strategiesを読むと
- NFDataはControl.DeepSeqに移った。
- rnfはもはやStrategyにはないのでかわりにrdeepseqを使え
とのことなのでそう書きなおしてみた。
$ time ./LineCount +RTS -N1 -RTS test.log
test.log: 2146144
real 0m1.273s
user 0m0.709s
sys 0m0.318s
$ time ./LineCount +RTS -N2 -RTS test.log
test.log: 2146144
real 0m0.652s
user 0m0.781s
sys 0m0.373s
お、速くなった。
MapReduce.hs
module MapReduce
(
mapReduce
, simpleMapReduce
-- exported for convenience
-- , rnf
-- , rwhnf
) where
import Control.Parallel (pseq)
import Control.Parallel.Strategies
simpleMapReduce
:: (a -> b)
-> ([b] -> c)
-> [a]
-> c
simpleMapReduce mapFunc reduceFunc = reduceFunc . map mapFunc
mapReduce
:: Strategy b
-> (a -> b)
-> Strategy c
-> ([b] -> c)
-> [a]
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
LineChunks.hs
module LineChunks
(
chunkedReadWith
) where
import Control.OldException (bracket, finally)
import Control.Monad (forM, liftM)
--import Control.Parallel.Strategies
import Control.DeepSeq (NFData, rnf)
import Data.Int
import qualified Data.ByteString.Lazy.Char8 as LB
import GHC.Conc (numCapabilities)
import System.IO
data ChunkSpec = CS {
chunkOffset :: !Int64
,chunkLength :: !Int64
} deriving (Eq, Show)
withChunks :: (NFData a) =>
(FilePath -> IO [ChunkSpec])
-> ([LB.ByteString] -> a)
-> FilePath
-> IO a
withChunks chunkFunc process path = do
(chunks, handles) <- chunkedRead chunkFunc path
let r = process chunks
(rnf r `seq` return r) `finally` mapM_ hClose handles
chunkedReadWith :: (NFData a) =>
([LB.ByteString] -> a) -> FilePath -> IO a
chunkedReadWith func path =
withChunks (lineChunks (numCapabilities * 4)) func path
chunkedRead :: (FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([LB.ByteString], [Handle])
chunkedRead chunkFunc path = do
chunks <- chunkFunc path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h
return (chunk, h)
lineChunks :: Int -> FilePath -> IO [ChunkSpec]
lineChunks numChunks path = do
bracket (openFile path ReadMode) hClose $ \h -> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = totalSize `div` fromIntegral numChunks
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline off = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- LB.hGet h 4096
case LB.elemIndex '\n' bytes of
Just n -> do
chunks@(c:_) <- findChunks (off + n + 1)
let coff = chunkOffset c
return (CS offset (coff - offset):chunks)
Nothing -> findNewline (off + LB.length bytes)
findNewline newOffset
findChunks 0
LineCount.hs
module Main where
import Control.Monad (forM_)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy.Char8 as LB
import System.Environment (getArgs)
import LineChunks (chunkedReadWith)
import MapReduce (mapReduce)
import Control.Parallel.Strategies
lineCount :: [LB.ByteString] -> Int64
lineCount = mapReduce rdeepseq (LB.count '\n')
rdeepseq sum
main :: IO ()
main = do
args <- getArgs
forM_ args $ \path -> do
numLines <- chunkedReadWith lineCount path
putStrLn $ path ++ ": " ++ show numLines