@@ -15,13 +15,14 @@ example:
15
15
...
16
16
:dependencies [...
17
17
[org.codehaus.jsr166-mirror/jsr166y " 1.7.0" ]
18
- [com.damballa/parkour " 0.5.4 " ]
18
+ [com.damballa/parkour " 0.6.0 " ]
19
19
...]
20
20
21
21
:profiles {...
22
22
:provided
23
23
{:dependencies
24
- [[org.apache.hadoop/hadoop-core " 1.2.1" ]]}
24
+ [[org.apache.hadoop/hadoop-client " 2.4.1" ]
25
+ [org.apache.hadoop/hadoop-common " 2.4.1" ]]}
25
26
...}
26
27
... )
27
28
```
@@ -35,7 +36,7 @@ following are the key ideas Parkour introduces or re-purposes.
35
36
36
37
### MapReduce via ` reducers ` (and lazy seqs)
37
38
38
- The Clojure 1.5 ` clojure.core.reducers ` standard library namespace narrows the
39
+ The Clojure >= 1.5 ` clojure.core.reducers ` standard library namespace narrows the
39
40
idea of a “collection” to “something which may be ` reduce ` d.” This abstraction
40
41
allows the sequences of key-value tuples processed in Hadoop MapReduce tasks to
41
42
be represented as collections. MapReduce tasks become functions over
@@ -67,7 +68,10 @@ Hadoop and Java Hadoop libraries typically contain a number of static methods
67
68
for configuring Hadoop ` Job ` objects, handling such tasks as setting input &
68
69
output paths, serialization formats, etc. Parkour codifies these as
69
70
_ configuration steps_ , or _ csteps_ : functions which accept a ` Job ` object as
70
- their single argument and modify that ` Job ` to apply some configuration.
71
+ their single argument and modify that ` Job ` to apply some configuration. The
72
+ crucial difference is that csteps are themselves first-class values, allowing
73
+ dynamic assembly of job configurations from the composition of opaque
74
+ configuration elements.
71
75
72
76
In practice, Parkour configuration steps are implemented via a protocol
73
77
(` parkour.cstep/ConfigStep ` ) and associated public application function
@@ -84,8 +88,9 @@ In practice, Parkour configuration steps are implemented via a protocol
84
88
A Parkour distributed sequence configures a job for input from a particular
85
89
location and input format, reifying a function calling the underlying Hadoop
86
90
` Job#setInputFormatClass ` etc methods. In addition to ` ConfigStep ` , dseqs also
87
- implement the core Clojure ` CollReduce ` protocol, allowing any Hadoop job input
88
- source to also be treated as a local reducible collection.
91
+ implement the core Clojure ` CollReduce ` and ` CollFold ` protocols, allowing any
92
+ Hadoop job input source to also be treated as a local reducible and foldable
93
+ collection.
89
94
90
95
#### Distributed sinks (dsinks)
91
96
@@ -121,46 +126,35 @@ allows adding arbitrary configuration steps to a job node in any stage.
121
126
Here’s the complete classic “word count” example, written using Parkour:
122
127
123
128
``` clj
124
- (ns parkour.examples .word-count
129
+ (ns parkour.example .word-count
125
130
(:require [clojure.string :as str]
126
131
[clojure.core.reducers :as r]
127
132
[parkour (conf :as conf) (fs :as fs) (mapreduce :as mr)
128
- , (graph :as pg) (tool :as tool)]
133
+ , (graph :as pg) (toolbox :as ptb) ( tool :as tool)]
129
134
[parkour.io (text :as text) (seqf :as seqf)])
130
135
(:import [org.apache.hadoop.io Text LongWritable]))
131
136
132
- (defn mapper
133
- {::mr/source-as :vals }
134
- [input]
135
- (->> input
137
+ (defn word-count-m
138
+ [coll]
139
+ (->> coll
136
140
(r/mapcat #(str/split % #"\s +" ))
137
141
(r/map #(-> [% 1 ]))))
138
142
139
- (defn reducer
140
- {::mr/source-as :keyvalgroups }
141
- [input]
142
- (r/map (fn [[word counts]]
143
- [word (r/reduce + 0 counts)])
144
- input))
145
-
146
143
(defn word-count
147
- [conf workdir lines]
148
- (let [wc-path (fs/path workdir " word-count" )
149
- wc-dsink (seqf/dsink [Text LongWritable] wc-path)]
150
- (-> (pg/input lines)
151
- (pg/map #'mapper)
152
- (pg/partition [Text LongWritable])
153
- (pg/combine #'reducer)
154
- (pg/reduce #'reducer)
155
- (pg/output wc-dsink)
156
- (pg/execute conf " word-count" )
157
- first)))
144
+ [conf lines]
145
+ (-> (pg/input lines)
146
+ (pg/map #'word-count-m)
147
+ (pg/partition [Text LongWritable])
148
+ (pg/combine #'ptb/keyvalgroups-r #'+)
149
+ (pg/output (seqf/dsink [Text LongWritable]))
150
+ (pg/fexecute conf `word-count)))
158
151
159
152
(defn tool
160
- [conf & args]
161
- (let [[workdir & inpaths] args
162
- lines (apply text/dseq inpaths)]
163
- (->> (word-count conf workdir lines) (into {}) prn)))
153
+ [conf & inpaths]
154
+ (->> (apply text/dseq inpaths)
155
+ (word-count conf)
156
+ (into {})
157
+ (prn )))
164
158
165
159
(defn -main
166
160
[& args] (System/exit (tool/run tool args)))
@@ -170,8 +164,8 @@ Let’s walk through some important features of this example.
170
164
171
165
### Task vars & adapters
172
166
173
- The remote task vars (the arguments to the ` map ` , ` combine ` , and ` reduce ` calls)
174
- have complete control over execution of their associated tasks. The underlying
167
+ The remote task vars (the arguments to the ` map ` and ` combine ` calls) have
168
+ complete control over execution of their associated tasks. The underlying
175
169
interface Parkour exposes models the Hadoop ` Mapper ` and ` Reducer ` classes as
176
170
higher-order function, with construction/configuration invoking the initial
177
171
function, then task-execution invoking the function returned by the former.
@@ -187,23 +181,28 @@ Parkour-Hadoop interface.
187
181
### Inputs as collections
188
182
189
183
In the default Hadoop Java interface, Hadoop calls a user-supplied method for
190
- each input tuple. Parkour instead calls the task function with the entire set
191
- of local input tuples as a single reducible collection, and expects a reducible
192
- output collection as the result.
193
-
194
- The input collections are directly reducible as vectors of key/value pairs, but
195
- the ` parkour.mapreduce ` namespace contains functions to efficiently reshape the
196
- task-inputs, including ` vals ` to access just the input values and (reduce-side)
197
- ` keyvalgroups ` to access grouping keys and grouped sub-collections of values.
198
- This model also allows access to more esoteric shapes generally not considered
199
- available from the raw Java interface, such as ` keykeygroups ` . These functions
200
- may be invoked directly, or passed to the ` collfn ` adapter via ` ::mr/source-as `
201
- metadata as in the example.
202
-
203
- Parkour also defaults to emitting the result collection as key/value pairs, but
204
- ` pakour.mapreduce ` contains a ` sink-as ` function (and supports ` collfn ` adapter
205
- ` ::mr/sink-as ` metadata) for specifying alternative shapes for task output. The
206
- ` sink ` function allows explicit sinking to context objects or other sinks.
184
+ each input tuple, which is then expected to write zero or more output tuples.
185
+ Parkour instead calls task functions with the entire set of local input tuples
186
+ as a single reducible collection, and expects a reducible output collection as
187
+ the result.
188
+
189
+ In the underlying Hadoop interfaces, all input and output happens in terms of
190
+ key/value pairs. Parkour’s input collections are directly reducible as vectors
191
+ of those key/value pairs, but also support efficient “reshaping” to iterate over
192
+ just the relevant data for a particular task. These shapes include ` vals ` to
193
+ access just the input values and (reduce-side) ` keyvalgroups ` to access grouping
194
+ keys and grouped sub-collections of values. The ` collfn ` adapter allows
195
+ specifying these shapes as keywords via ` ::mr/source-as ` metadata on task vars.
196
+
197
+ Parkour usually defaults to emitting the result collection as key/value pairs,
198
+ but the ` collfn ` adapter supports ` ::mr/sink-as ` metadata for specifying
199
+ alternative shapes for task output.
200
+
201
+ Additionally, dseqs and dsinks may supply different default input and output
202
+ shapes. The text dseq used in the word count example specifies ` vals ` as the
203
+ default, allowing unadorned task vars to receive the input collection as a
204
+ collection of the input text lines (versus the underlying Hadoop tuples of file
205
+ offset and text line).
207
206
208
207
### Automatic wrapping & unwrapping
209
208
@@ -222,10 +221,10 @@ with Hadoop’s serialization containers.
222
221
223
222
### Results
224
223
225
- The return value of the ` execute ` function is a vector of dseqs for the job
226
- graph leaf node results . These dseqs may be consumed locally as in the example,
227
- or used as inputs for additional jobs. When locally ` reduce ` d, dseqs yield
228
- key-value vectors of the ` unwrap ` ed values of the objects produced by the
229
- backing Hadoop input format. The ` parkour.io.dseq/source-for ` function can
230
- provide direct access to the raw wrapper objects, as well as allowing dseqs to
231
- be realized as lazy sequences instead of reducers.
224
+ The return value of the ` fexecute ` function is a dseq for the job graph leaf
225
+ node result . That dseq may be consumed locally as in the example, or used as
226
+ the input for additional jobs. When locally ` reduce ` d, dseqs yield key-value
227
+ vectors of the ` unwrap ` ed values of the objects produced by the backing Hadoop
228
+ input format. The ` parkour.io.dseq/source-for ` function can provide direct
229
+ access to the raw wrapper objects, as well as allowing dseqs to be realized as
230
+ lazy sequences instead of reducers.
0 commit comments