Skip to content

Commit 6001f11

Browse files
author
Marshall Bockrath-Vandegrift
committed
Enough plumbing for an example tracer.
1 parent 83ebda7 commit 6001f11

File tree

5 files changed

+227
-4
lines changed

5 files changed

+227
-4
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ pom.xml.asc
1010
.lein-failures
1111
.lein-plugins
1212
.lein-repl-history
13+
/tmp/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
(ns parkour.examples.word-count
2+
(:require [clojure.string :as str]
3+
[clojure.core.reducers :as r]
4+
[parkour.mapreduce :as mr]
5+
[parkour.writable :as w]
6+
[parkour.fs :as fs]
7+
[parkour.util :refer [returning]])
8+
(:import [org.apache.hadoop.io IntWritable Text]
9+
[org.apache.hadoop.mapreduce.lib.input
10+
TextInputFormat FileInputFormat]
11+
[org.apache.hadoop.mapreduce.lib.output
12+
TextOutputFormat FileOutputFormat]))
13+
14+
(defn mapper
15+
{::mr/output [Text IntWritable]}
16+
[conf]
17+
(fn [input output]
18+
(let [word (Text.), one (IntWritable. 1)]
19+
(->> (mr/vals input)
20+
(r/map w/unwable)
21+
(r/mapcat #(str/split % #"\s+"))
22+
(r/map (fn [s] (returning [word one] (w/wable word s))))
23+
(r/reduce mr/emit-keyval output)))))
24+
25+
(defn reducer
26+
[conf]
27+
{::mr/output [Text IntWritable]}
28+
(fn [input output]
29+
(let [total (IntWritable.)]
30+
(->> (mr/keyvalgroups input)
31+
(r/map (fn [[word counts]]
32+
(returning [word total]
33+
(->> (r/map w/unwable counts)
34+
(r/reduce + 0)
35+
(w/wable total)))))
36+
(r/reduce mr/emit-keyval output)))))
37+
38+
(defn -main
39+
[& args]
40+
(let [[inpath outpath] args, job (mr/job)]
41+
(doto job
42+
(mr/set-mapper-var #'mapper)
43+
(mr/set-combiner-var #'reducer)
44+
(mr/set-reducer-var #'reducer)
45+
(.setInputFormatClass TextInputFormat)
46+
(.setOutputFormatClass TextOutputFormat)
47+
(FileInputFormat/addInputPath (fs/path inpath))
48+
(FileOutputFormat/setOutputPath (fs/path outpath)))
49+
(.waitForCompletion job true)))

src/clojure/parkour/fs.clj

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
(ns parkour.fs
2+
(:require [clojure.java.io :as io])
3+
(:import [java.net URI]
4+
[java.io File]
5+
[org.apache.hadoop.conf Configuration]
6+
[org.apache.hadoop.fs FileSystem Path]))
7+
8+
(defprotocol Coercions
9+
(^org.apache.hadoop.fs.Path
10+
-path [x] "Coerce argument to a Path; private implementation.")
11+
(^java.net.URI
12+
-uri [x] "Coerce argument to a URI; private implementation."))
13+
14+
(defn path
15+
"Coerce argument(s) to a Path, resolving successive arguments against base."
16+
{:tag `Path}
17+
([x] (-path x))
18+
([x y] (Path. (-path x) (str y)))
19+
([x y & more] (apply path (path x y) more)))
20+
21+
(defn uri
22+
"Coerce argument(s) to a URI, resolving successive arguments against base."
23+
{:tag `URI}
24+
([x] (-uri x))
25+
([x y]
26+
(let [x (-uri x)]
27+
(-> x (.resolve (str (.getPath x) "/")) (.resolve (str y)))))
28+
([x y & more] (apply uri (uri x y) more)))
29+
30+
(defn path-fs
31+
"Hadoop filesystem for the path `p`."
32+
{:tag `FileSystem}
33+
([p] (path-fs (Configuration.) p))
34+
([conf p] (.getFileSystem (path p) ^Configuration conf)))
35+
36+
(extend-protocol Coercions
37+
String
38+
(-path [x]
39+
(if (.startsWith x "file:")
40+
(-path (io/file (subs x 5)))
41+
(Path. x)))
42+
(-uri [x]
43+
(let [uri (URI. x)]
44+
(condp = (.getScheme uri)
45+
"file" (.toURI (io/file uri))
46+
nil (let [p (Path. x)]
47+
(.toUri (.makeQualified p (path-fs p))))
48+
,,,,,, uri)))
49+
50+
Path
51+
(-path [x] x)
52+
(-uri [x] (.toUri (.makeQualified x (path-fs x))))
53+
54+
URI
55+
(-path [x] (Path. x))
56+
(-uri [x] x)
57+
58+
File
59+
(-path [x] (Path. (str "file:" (.getAbsolutePath x))))
60+
(-uri [x] (.toURI x)))

src/clojure/parkour/mapreduce.clj

+67-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
[clojure.core.reducers :as r]
55
[clojure.core.protocols :as ccp]
66
[clojure.string :as str]
7+
[clojure.reflect :as reflect]
8+
[parkour.writable :as w]
79
[parkour.util :refer [returning]]
810
[parkour.reducers :as pr])
911
(:import [java.util Comparator]
1012
[clojure.lang IPersistentCollection]
13+
[org.apache.hadoop.conf Configuration]
1114
[org.apache.hadoop.io NullWritable]
1215
[org.apache.hadoop.mapreduce
13-
MapContext ReduceContext TaskInputOutputContext]))
16+
Job MapContext ReduceContext TaskInputOutputContext]))
1417

1518
(defprotocol MRSource
1619
(keyvals [source] "")
@@ -130,6 +133,66 @@
130133
(returning sink (.write sink (NullWritable/get) val)))
131134

132135
IPersistentCollection
133-
(emit-keyval [sink keyval] (conj sink keyval))
134-
(emit-key [sink key] (conj sink [key nil]))
135-
(emit-val [sink val] (conj sink [nil val])))
136+
(emit-keyval [sink keyval] (conj sink (mapv w/clone keyval)))
137+
(emit-key [sink key] (conj sink [(w/clone key) nil]))
138+
(emit-val [sink val] (conj sink [nil (w/clone val)])))
139+
140+
(def ^:private job-factory-method?
141+
(->> Job reflect/type-reflect :members (some #(= 'getInstance (:name %)))))
142+
143+
(defmacro ^:private make-job
144+
[& args] `(~(if job-factory-method? `Job/getInstance `Job.) ~@args))
145+
146+
(defn job
147+
{:tag `Job}
148+
([] (make-job))
149+
([conf]
150+
(if (instance? Job conf)
151+
(make-job (-> ^Job conf .getConfiguration Configuration.))
152+
(make-job ^Configuration conf))))
153+
154+
(defn mapper!
155+
[^Job job var & args]
156+
(let [conf (.getConfiguration job)
157+
i (.getInt conf "parkour.mapper.next" 0)]
158+
(doto conf
159+
(.setInt "parkour.mapper.next" (inc i))
160+
(.set (format "parkour.mapper.%d.var" i) (pr-str var))
161+
(.set (format "parkour.mapper.%d.args" i) (pr-str args)))
162+
(Class/forName (format "parkour.hadoop.Mappers$_%d" i))))
163+
164+
(defn reducer!
165+
[^Job job var & args]
166+
(let [conf (.getConfiguration job)
167+
i (.getInt conf "parkour.reducer.next" 0)]
168+
(doto conf
169+
(.setInt "parkour.reducer.next" (inc i))
170+
(.set (format "parkour.reducer.%d.var" i) (pr-str var))
171+
(.set (format "parkour.reducer.%d.args" i) (pr-str args)))
172+
(Class/forName (format "parkour.hadoop.Reducers$_%d" i))))
173+
174+
(defn partitioner!
175+
[^Job job var & args]
176+
(let [conf (.getConfiguration job)]
177+
(doto conf
178+
(.set "parkour.partitioner.var" (pr-str var))
179+
(.set "parkour.partitioner.args" (pr-str args)))
180+
parkour.hadoop.Partitioner))
181+
182+
(defn set-mapper-var
183+
[^Job job var & args]
184+
(let [[key val] (-> var meta ::output)]
185+
(.setMapperClass job (apply mapper! job var args))
186+
(when key (.setMapOutputKeyClass job key))
187+
(when val (.setMapOutputValueClass job val))))
188+
189+
(defn set-combiner-var
190+
[^Job job var & args]
191+
(.setCombinerClass job (apply reducer! job var args)))
192+
193+
(defn set-reducer-var
194+
[^Job job var & args]
195+
(let [[key val] (-> var meta ::output)]
196+
(.setReducerClass job (apply reducer! job var args))
197+
(when key (.setOutputKeyClass job key))
198+
(when val (.setOutputValueClass job val))))

src/clojure/parkour/writable.clj

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
(ns parkour.writable
2+
(:require [parkour.util :refer [returning]])
3+
(:import [java.lang.reflect Constructor]
4+
[clojure.lang IPersistentVector]
5+
[org.apache.hadoop.io
6+
IntWritable LongWritable NullWritable Text Writable]))
7+
8+
(defprotocol Wable
9+
(-wable [obj] "Wrap `obj` in a type-specific Writable."))
10+
11+
(defprotocol Rewable
12+
(-rewable [wobj obj] "Mutate Writable `wobj` to wrap `obj`."))
13+
14+
(defn wable
15+
([obj] (-wable obj))
16+
([wobj obj] (returning wobj (-rewable wobj obj))))
17+
18+
(defprotocol Unwable
19+
(-unwable [wobj] "Unwrap Writable `wobj` in a type-specific fashion."))
20+
21+
(defn unwable
22+
[wobj] (-unwable wobj))
23+
24+
(extend-protocol Wable
25+
Writable (-wable [obj] obj)
26+
IPersistentVector (-wable [obj] (mapv wable obj))
27+
nil (-wable [obj] (NullWritable/get))
28+
String (-wable [obj] (Text. obj))
29+
Integer (-wable [obj] (IntWritable. obj))
30+
Long (-wable [obj] (LongWritable. obj)))
31+
32+
(extend-protocol Rewable
33+
NullWritable (-rewable [wobj obj] #_ pass)
34+
Text (-rewable [wobj obj] (.set wobj ^String obj))
35+
IntWritable (-rewable [wobj obj] (.set wobj obj))
36+
LongWritable (-rewable [wobj obj] (.set wobj obj)))
37+
38+
(extend-protocol Unwable
39+
Object (-unwable [wobj] wobj)
40+
IPersistentVector (-unwable [wobj] (mapv unwable wobj))
41+
NullWritable (-unwable [wobj] nil)
42+
Text (-unwable [wobj] (.toString wobj))
43+
IntWritable (-unwable [wobj] (.get wobj))
44+
LongWritable (-unwable [wobj] (.get wobj)))
45+
46+
(defn clone
47+
[wobj]
48+
(if-not (instance? Writable wobj)
49+
wobj
50+
(wable (-> wobj .getClass .newInstance) (unwable wobj))))

0 commit comments

Comments
 (0)