-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathMergeSort.hs
105 lines (91 loc) · 3.16 KB
/
MergeSort.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
module Main
( main
, sortMergeCombined
, sortMergeChunks
)
where
import Control.Monad (void)
import Data.Function ((&))
import Streamly.Data.Array (Array)
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Array as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Data.StreamK as K
import qualified Streamly.Internal.Data.Stream as Stream (reduceIterateBfs)
input :: [Int]
input = [1000000,999999..1]
chunkSize :: Int
chunkSize = 32*1024
streamChunk :: Array Int -> Stream IO Int
streamChunk =
K.toStream
. K.sortBy compare
. K.fromStream
. Array.read
sortChunk :: Array Int -> IO (Array Int)
sortChunk = Stream.fold Array.create . streamChunk
-------------------------------------------------------------------------------
-- Stream the unsorted chunks and sort, merge those streams.
-------------------------------------------------------------------------------
-- In contrast to sortMergeSeparate this uses much more peak memory because all
-- the streams are open in memory at the same time.
sortMergeCombined :: (Array Int -> Stream IO Int) -> IO ()
sortMergeCombined f =
Stream.fromList input
& Array.chunksOf chunkSize
& K.fromStream
& K.mergeMapWith (K.mergeBy compare) (K.fromStream . f)
& K.toStream
& Stream.fold Fold.drain
-------------------------------------------------------------------------------
-- First create a stream of sorted chunks, then stream sorted chunks and merge
-- the streams
-------------------------------------------------------------------------------
sortMergeSeparate ::
( (Array Int -> IO (Array Int))
-> Stream IO (Array Int)
-> Stream IO (Array Int)
)
-> IO ()
sortMergeSeparate f =
Stream.fromList input
& Array.chunksOf chunkSize
& f sortChunk
& K.fromStream
& K.mergeMapWith (K.mergeBy compare) (K.fromStream . Array.read)
& K.toStream
& Stream.fold Fold.drain
-------------------------------------------------------------------------------
-- First create a stream of sorted chunks, merge sorted chunks into sorted
-- chunks recursively.
-------------------------------------------------------------------------------
reduce :: Array Int -> Array Int -> IO (Array Int)
reduce arr1 arr2 =
Stream.mergeBy
compare
(Array.read arr1)
(Array.read arr2)
& Stream.fold Array.create
sortMergeChunks ::
( (Array Int -> IO (Array Int))
-> Stream IO (Array Int)
-> Stream IO (Array Int)
)
-> IO ()
sortMergeChunks f =
Stream.fromList input
& Array.chunksOf chunkSize
& f sortChunk
& Stream.reduceIterateBfs reduce
& void
-- | Divide a stream in chunks, sort the chunks and merge them.
main :: IO ()
main = do
-- Sorted in best performing first order
sortMergeSeparate (Stream.parMapM id)
-- sortMergeSeparate Stream.mapM
-- sortMergeCombined (Stream.parEval id . streamChunk)
-- sortMergeCombined streamChunk
-- sortMergeChunks (Stream.parMapM id)
-- sortMergeChunks Stream.mapM