Drkcore

16 10 2010 Haskell RWH Tweet

Real World Haskell 24章 10

MapReduce、ただし、サンプルの通りでは動かない。

ProductName Real World Haskell―実戦で学ぶ関数型言語プログラミング
Bryan O'Sullivan,John Goerzen,Don Stewart
オライリージャパン / ¥ 3,990 ()
在庫あり。

Control.Parallel.Strategiesを読むと

  • NFDataはControl.DeepSeqに移った。
  • rnfはもはやStrategyにはないのでかわりにrdeepseqを使え

とのことなのでそう書きなおしてみた。

$ time ./LineCount +RTS -N1 -RTS test.log 
test.log: 2146144

real    0m1.273s
user    0m0.709s
sys     0m0.318s
$ time ./LineCount +RTS -N2 -RTS test.log 
test.log: 2146144

real    0m0.652s
user    0m0.781s
sys     0m0.373s

お、速くなった。

MapReduce.hs

module MapReduce
    (
      mapReduce
    , simpleMapReduce
    -- exported for convenience
--    , rnf
--    , rwhnf
    ) where

import Control.Parallel (pseq)
import Control.Parallel.Strategies

simpleMapReduce
    :: (a -> b)
    -> ([b] -> c)
    -> [a]
    -> c

simpleMapReduce mapFunc reduceFunc = reduceFunc . map mapFunc

mapReduce
    :: Strategy b
    -> (a -> b)
    -> Strategy c
    -> ([b] -> c)
    -> [a]
    -> c

mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
    mapResult `pseq` reduceResult
    where mapResult    = parMap mapStrat mapFunc input
          reduceResult = reduceFunc mapResult `using` reduceStrat

LineChunks.hs

module LineChunks
    (
     chunkedReadWith
    ) where

import Control.OldException (bracket, finally)
import Control.Monad (forM, liftM)
--import Control.Parallel.Strategies 
import Control.DeepSeq (NFData, rnf)
import Data.Int
import qualified Data.ByteString.Lazy.Char8 as LB
import GHC.Conc (numCapabilities)
import System.IO

data ChunkSpec = CS {
      chunkOffset :: !Int64
     ,chunkLength :: !Int64
    } deriving (Eq, Show)

withChunks :: (NFData a) =>
              (FilePath -> IO [ChunkSpec])
           -> ([LB.ByteString] -> a)
           -> FilePath
           -> IO a

withChunks chunkFunc process path = do
  (chunks, handles) <- chunkedRead chunkFunc path
  let r = process chunks
  (rnf r `seq` return r) `finally` mapM_ hClose handles

chunkedReadWith :: (NFData a) => 
                   ([LB.ByteString] -> a) -> FilePath -> IO a

chunkedReadWith func path =
    withChunks (lineChunks (numCapabilities * 4)) func path


chunkedRead :: (FilePath -> IO [ChunkSpec])
          -> FilePath
          -> IO ([LB.ByteString], [Handle])
chunkedRead chunkFunc path = do
  chunks <- chunkFunc path
  liftM unzip . forM chunks $ \spec -> do
                               h <- openFile path ReadMode
                               hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
                               chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h
                               return (chunk, h)

lineChunks :: Int -> FilePath -> IO [ChunkSpec]
lineChunks numChunks path = do
  bracket (openFile path ReadMode) hClose $ \h -> do
    totalSize <- fromIntegral `liftM` hFileSize h
    let chunkSize = totalSize `div` fromIntegral numChunks
        findChunks offset = do
          let newOffset = offset + chunkSize
          hSeek h AbsoluteSeek (fromIntegral newOffset)
          let findNewline off = do
                              eof <- hIsEOF h
                              if eof
                                 then return [CS offset (totalSize - offset)]
                                 else do
                                   bytes <- LB.hGet h 4096
                                   case LB.elemIndex '\n' bytes of
                                     Just n -> do
                                        chunks@(c:_) <- findChunks (off + n + 1)
                                        let coff = chunkOffset c
                                        return (CS offset (coff - offset):chunks)
                                     Nothing -> findNewline (off + LB.length bytes)
          findNewline newOffset
    findChunks 0

LineCount.hs

module Main where

import Control.Monad (forM_)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy.Char8 as LB
import System.Environment (getArgs)
import LineChunks (chunkedReadWith)
import MapReduce (mapReduce)
import Control.Parallel.Strategies

lineCount :: [LB.ByteString] -> Int64
lineCount = mapReduce rdeepseq (LB.count '\n')
                      rdeepseq sum

main :: IO ()
main = do
  args <- getArgs
  forM_ args $ \path -> do
    numLines <- chunkedReadWith lineCount path
    putStrLn $ path ++ ": " ++ show numLines

About

  • もう5年目(wishlistありマス♡)
  • 最近はPythonとDeepLearning
  • 日本酒自粛中
  • ドラムンベースからミニマルまで
  • ポケモンGOゆるめ

Tag

Python Deep Learning javascript chemoinformatics Emacs sake and more...

Ad

© kzfm 2003-2021