Skip to content

Commit db42b0a

Browse files
committed
Decomplect dvals into dcpaths and dvals.
1 parent 5f66e2a commit db42b0a

File tree

2 files changed

+91
-56
lines changed

2 files changed

+91
-56
lines changed

resources/data_readers.clj

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
hadoop.fs/path parkour.fs/path,
33
hadoop.mapreduce/job parkour.mapreduce/job,
44
java.net/uri parkour.fs/uri,
5-
parkour/dval parkour.io.dval/distcache-dval,
5+
parkour/dcpath parkour.io.dval/dcpath-reader,
6+
parkour/dval parkour.io.dval/dval-reader,
67
}

src/clojure/parkour/io/dval.clj

+89-55
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,42 @@
66
[parkour.util :as util :refer [doto-let]])
77
(:import [java.io Writer]
88
[java.net URI]
9-
[clojure.lang IDeref IPending]
9+
[clojure.lang IDeref IObj IPending]
10+
[org.apache.hadoop.fs Path]
1011
[org.apache.hadoop.filecache DistributedCache]))
1112

12-
;; Distributed value
13-
(deftype DVal [value readv params dcm]
14-
Object
15-
(toString [_]
16-
(let [v (if (realized? value) @value :pending)]
17-
(str "#<DVal: " (pr-str v) ">")))
18-
IDeref (deref [_] @value)
19-
IPending (isRealized [_] (realized? value)))
13+
(defn ^:private cache-name
14+
"Generate distinct distcache-entry name for `source`."
15+
[source] (str (gensym "dval-") "-" (-> source fs/path .getName)))
16+
17+
(defn ^:private dcpath*
18+
[dcname path]
19+
(let [path (str (fs/path path))
20+
dcpath* (fn dcpath* [md]
21+
(proxy [Path IObj] [path]
22+
(meta [] md)
23+
(withMeta [md] (dcpath* md path))))]
24+
(dcpath* {:type ::dcpath, ::dcname dcname})))
25+
26+
(defn dcpath
27+
"Distributed-cache–able instance of `path`. When serialized into a job
28+
configuration, adds `path` to the distributed cache. When deserialized from a
29+
job configuration, reconstitutes as the cache path when available and as the
30+
original remote path when not (i.e. under local- or mixed-mode job execution)."
31+
[path]
32+
(let [path (fs/path path)
33+
dcname (cache-name path)]
34+
(dcpath* dcname path)))
35+
36+
(defmethod print-method ::dcpath
37+
[path ^Writer writer]
38+
(if (nil? cser/*conf*)
39+
(print-method (with-meta path nil) writer)
40+
(let [^String dcname (-> path meta ::dcname)]
41+
(fs/distcache! cser/*conf* {dcname path})
42+
(.write writer "#parkour/dcpath \"")
43+
(.write writer dcname)
44+
(.write writer "\""))))
2045

2146
(defn ^:private unfragment
2247
"The provided `uri`, but without any fragment."
@@ -28,74 +53,83 @@
2853
(let [uri-s (str uri), n (- (count uri-s) (count fragment) 1)]
2954
(fs/uri (subs uri-s 0 n))))))
3055

31-
(defn ^:private ->entry
32-
"Parse `remote` and `local` into mapping tuple of (fragment, cache path)."
33-
[^URI remote local]
56+
(defn ^:private resolve-source
57+
"Resolve distributed cache `remote` and `local` URIs to the \"most local\"
58+
available source path. Result will usually be a local file path, but may be the
59+
original remote path under mixed-mode job execution."
60+
[[^URI remote local]]
3461
(if-let [fragment (.getFragment remote)]
3562
(let [symlink (io/file fragment), symlink? (.exists symlink)
3663
local (io/file (str local)), local? (.exists local)
3764
remote (unfragment remote), remote? (= "file" (.getScheme remote))
3865
source (cond symlink? symlink, local? local, remote? remote
66+
;; Could localize, but issues: clean-up, directories
3967
(mr/local-runner? cser/*conf*) remote
4068
:else (throw (ex-info
4169
(str remote ": cannot locate local file")
4270
{:remote remote, :local local})))]
43-
[fragment (fs/path source)])))
71+
(fs/path source))))
4472

45-
(defn ^:private distcache-dval
46-
"Remote-side dval data reader, reconstituting as a delay."
47-
[[readv params cnames]]
73+
(defn ^:private dcpath-reader
74+
"EDN tagged-literal reader for dcpaths."
75+
[dcname]
4876
(let [remotes (seq (DistributedCache/getCacheFiles cser/*conf*))
49-
locals (seq (DistributedCache/getLocalCacheFiles cser/*conf*))
50-
_ (when (not= (count remotes) (count locals))
51-
(throw (ex-info "cache files do not match local files"
52-
{:remotes remotes, :locals locals})))
53-
entries (map ->entry remotes locals)
54-
cname->source (->> entries (remove nil?) (into {}))
55-
sources (map cname->source cnames)
56-
args (concat params sources)]
57-
(delay (apply readv args))))
77+
locals (or (seq (DistributedCache/getLocalCacheFiles cser/*conf*))
78+
(map unfragment remotes))]
79+
(if (not= (count remotes) (count locals))
80+
(throw (ex-info "cache files do not match local files"
81+
{:remotes remotes, :locals locals}))
82+
(->> (map vector remotes locals)
83+
(pr/ffilter (fn [[^URI r]] (= dcname (.getFragment r))))
84+
(resolve-source)
85+
(dcpath* dcname)))))
86+
87+
;; Distributed value
88+
(deftype DVal [value form]
89+
Object
90+
(toString [_]
91+
(let [v (if (realized? value) @value :pending)]
92+
(str "#<DVal: " (pr-str v) ">")))
93+
IDeref (deref [_] @value)
94+
IPending (isRealized [_] (realized? value)))
5895

5996
(defmethod print-method DVal
60-
[^DVal dval ^Writer writer]
61-
(let [repr (if (nil? cser/*conf*)
62-
(str dval)
63-
(let [v (.-readv dval), p (.-params dval), dcm (.-dcm dval)
64-
literal (pr-str [v p (-> dcm keys vec)])]
65-
(fs/distcache! cser/*conf* dcm)
66-
(str "#parkour/dval " literal)))]
67-
(.write writer repr)))
97+
[^DVal dval ^Writer w]
98+
(if (nil? cser/*conf*)
99+
(.write w (str dval))
100+
(do
101+
(.write w "#parkour/dval ")
102+
(.write w (pr-str (.-form dval))))))
103+
104+
(defn ^:private dval-reader
105+
"EDN tagged-literal reader for dvals."
106+
[[f & args]] (delay (apply f args)))
107+
108+
(defn ^:private dval*
109+
"Return a dval which locally proxies to `valref` and remotely will deserialize
110+
as a delay over applying var `readv` to `args`."
111+
[valref readv & args] (DVal. valref (cons readv args)))
112+
113+
(defn dval
114+
"Return a dval which acts as a delay over applying var `readv` to `args`."
115+
[readv & args] (apply dval* (delay (apply readv args)) readv args))
68116

69117
(defn ^:private identity-ref
70118
"Return reference which yields `x` when `deref`ed."
71-
[x]
72-
(reify
73-
IDeref (deref [_] x)
74-
IPending (isRealized [_] true)))
75-
76-
(defn ^:private cache-name
77-
"Generate distinct distcache-entry name for `source`."
78-
[source] (str (gensym "dval-") "-" (-> source fs/path .getName)))
119+
[x] (reify IDeref (deref [_] x), IPending (isRealized [_] true)))
79120

80-
(defn ^:private dval
81-
"Return a dval which locally holds `value` and remotely will deserialize by
82-
applying var `readv` to the concatenation of `params` and distributed copies of
83-
`sources`."
84-
([value readv sources] (dval value readv nil sources))
85-
([value readv params sources]
86-
(let [dcm (into {} (map (juxt cache-name fs/uri) sources))
87-
value (if (instance? IDeref value) value (identity-ref value))]
88-
(DVal. value readv params dcm))))
121+
(defn value-dval
122+
"Return a dval which locally holds `value` and remotely will deserialize as a
123+
delay over applying var `readv` to `args`."
124+
[value readv & args] (apply dval* (identity-ref value) readv args))
89125

90126
(defn load-dval
91-
"Return a dval which will deserialize by applying var `readv` to the
127+
"Return a delay-like dval which will realize by applying var `readv` to the
92128
concatenation of `params` and `sources` locally and distributed copies of
93129
`sources` remotely."
94130
([readv sources] (load-dval readv nil sources))
95131
([readv params sources]
96-
(let [args (concat params sources)
97-
value (delay (apply readv args))]
98-
(dval value readv params sources))))
132+
(apply dval readv (concat params (map dcpath sources)))))
99133

100134
(defn copy-dval
101135
"Like `load-dval`, but first copy `sources` to transient locations."
@@ -114,7 +148,7 @@ deserialize by calling var `readv` with a distributed copy of the transient
114148
serialization path."
115149
[writef readv value]
116150
(let [source (doto (transient-path) (writef value))]
117-
(dval value readv [source])))
151+
(value-dval value readv (dcpath source))))
118152

119153
(defn edn-dval
120154
"EDN-serialize `value` to a transient location and yield a wrapping dval."

0 commit comments

Comments
 (0)