Updated: 22 nov 2013
We’ll look briefly in how you would utilize awesomeness of both Cascalog and HyperLogLog in order to execute Hadoop M/R tasks with amounts of data too big to have them in their original form.
We’ll deal with an example that is similar in its mechanics with a task we face ourselves at Screen6, yet being from another market — passenger transportation.
We are located in Amsterdam, and here, in Netherlands, you have this simple and easy system of public transport accessed by NFC card and which works as follows: prior to accessing the public transport (whether that be a tram, a bus or a train for that matter), you check-in by scanning your card, and once you get to your destination — you check-out with that same NFC card (which is called “OV-Chipkaart”).
It’s convenient, I use all the time, whenever I’m neither biking or walking.
Now imagine you have all these populous cities and people commuting between them on a daily/weekly/monthly/yearly basis, how would you monitor that traffic?
Say, you want to see distribution of passengers in a certain city district (identified by zipcode) over time, and on top of that you’d love to see that same distribution over a month for two districts? Or three districts? Or two cities altogether?
You can just store all events without any aggregations and directly do queries on that dataset, but it possesses two issues within itself: perfomance concern, as we are dealing with what supposed to be a very big amount of data, and hence — persistence issues.
You cannot just store the list of unique passengers in all the districts per each smallest time-unit — it will be simply way too big to process and access for further analytics; and you cannot just store count of unique passengers, as that information is useless once you come to merging traffic in the different districts and cities (certainly those sets intersect heavily — it is very common to live in The Hague and work either in Amsterdam, Amsterdam Sloterdijk or Schiphol).
That is where a caridnality estimator comes in handy. It doesn’t provide you with an exact number, but rather tells you what the cardinality of a certain set is with a desired error margin, yet being comparatively dense to the initial set of original items and allowing you to merge different sets.
A certain number of algorithms has popped out lately one of them being HyperLogLog, and that is the one we are taking with us on our Cascalog-ride.
We are assuming that we have OV-Chipkaart access-logs: each and every swipe (Log file line) gives you a record of
||Ms. from Epoch||1363026503|
Table 1. OV-Chipkaart access log record format
Now, we didn’t get any logs from OV-Chipkaarts proprietor (though, I would’ve been delighted to get my hands on them), so we will have to generate some randomized data them ourselves. Here is the clojure code snippet code that will do just that:
(def cities ["Amsterdam" "Groningen" "Haarlem" "Den-Haag" "Utrecht" "Delft" "Edam" ]) (def tags (repeatedly 10000 #(str (java.util.UUID/randomUUID)))) (defn line  (let [city (rand-nth cities) district (+ (+ 10 (rand-int 89)) (* (+ 10 (rand-int 89)) 100)) tag (rand-nth tags) timestamp (+ 1351680873 (rand-int 31536001))] (str city " " district " " tag " " timestamp "\n"))) (use 'clojure.java.io) (with-open [wrtr (writer "./ov-chipkaart-accesslogs.txt")] (dotimes [_ 5000000] (.write wrtr (line))))
This would provide us with a file of approximate 157Mb size holding 5.000.000 OV-chipkaart accesslog records; gziped — 78Mb.
Normally you would define one
source function per dataset type as a simple cascalog query, that does reading, mapping, type checks and conversions. Our example won’t be any different — we’ll read the file line-by-line from Hadoop File System (which will nicely pickup both gzippped and not gzipped files for us):
(ns ... (:require ... [cascalog.api :as c] [cascalog.ops :as ops])) (def ov-fields ["?city" "?district" "?uuid" "?timestamp"]) (defn ov-source "" [directory] (let [source (get-tap directory)] (c/<- ov-fields (source :>> ov-fields))))
Now we have an
ov-source cascalog query, which can be utilized as an incoming data stream; without caring too much if the data is correct or not.
Now, you can either go and implement your own implementation of HyperLogLog that will suit your needs, or you can pick one of the following already existing opensourced implementatoins:
|JCommon||HyperLogLog.java||Java||Apache License 2.0|
|Algebird||HyperLogLog.scala||Scala||Apache License 2.0|
|AddThis||Stream-Lib||HyperLogLog.java||Java||Apache License 2.0|
|indie!||yukim’s gists||HyperLogLog.java||Java||Apache License 2.0|
Table 2. Available HyperLogLog implementations
We chose AddThis’s Stream-Lib’s implementation, as from my subjective point of view it seemed to be most clear, nicely documented and reasonably implemented; besides, they added a bunch of other sweet things for cardinality estimation in that same library, together with the list of papers their implementations were based upon.
There are two approaches to merging HyperLogLog values:
Depending on the case, you might even want to construct an offer string for your HyperLogLog value as composite key of multiple values in the row. But no matter what you do, keep in mind — it is way better and much more efficient to keep the offers and merge those into the existing HyperLogLog value, rather than merging multiple HyperLogLog values.
Lets drop a little code sketch real quick to compare them:
(ns ... (:import [com.clearspring.analytics.stream.cardinality HyperLogLog HyperLogLog$Builder])) ;; single hyperloglog value, multiple inserts (time (dotimes [n 1000000] (.offer h (str "check" n)))) ;; and this is like if we are merging hyperloglogs all the time; ;; note that we have to create hyperloglog value every time (time (dotimes [n 1000000] (merge h (let [nh (create)] (.offer nh (str "check" n)) nh))))
Here is a sample time comparison
|operation||execution 1||execution 2||execution 3||execution 4|
||1513.87 msecs||1479.377 msecs||1493.119 msecs||1485.23 msecs|
||7433.302 msecs||7306.165 msecs||7369.459 msecs||7246.366 msecs|
Table 3. HyperLogLog offer vs merge execution times
Now, once we have read files, reducing in Cascalog is rather easy and straightforward, yet we will throw in some code in order to deal with HyperLogLog values in an easy way:
(defprotocol IHyperLogLogMerge (hyperloglog-val [this]) (merge [this other]) (merge-with-hyperloglog [this other-hll])) (extend-protocol IHyperLogLogMerge nil (hyperloglog-val [this] nil) (merge-with-hyperloglog [this other-hll] other-hll) (merge [this other] other) Object (hyperloglog-val [this] (doto (create) (.offer this))) (merge-with-hyperloglog [this other-hll] (.offer other-hll this) other-hll) (merge [this other] (merge (hyperloglog-val other) this)) HyperLogLog (hyperloglog-val [this] this) (merge-with-hyperloglog [this other-hll] (.addAll this other-hll) this) (merge [this other] (merge-with-hyperloglog other this)))
As you can see, this will merge whatever you feed it, and provide you with the HyperLogLog value. Though, not used in this particular example, will be used extensively in reallife scenario, when aggregating already obtainded aggregated results (that will hold not offers, but HyperLogLog values).
And then finally, let’s throw in some glue to aggregate HyperLogLog values in the Cascalog queries:
1 (c/defaggregateop sum* 2 ( (create)) 3 ([state val] 4 (.offer state val) 5 state) 6 ([state] [state])) 7 8 (def sum 9 (ops/each sum*))
This operations will “sum up” all the offers, the same way you would’ve use
cascalog.ops/sum operation on numerical values, you can use this operation on HyperLogLog values. In a cascalog query it will end up simply as:
(c/<- [?city ?district ?hll] (ov-source :>> ov-fields) (hll/sum ?uuid :> ?hll))
HyperLogLog is an object, basically a set or register set’s under the hood, and might you want to persist it — you will require to serialize it somehow.
Now in order to use some sort of Tap, you’ll need to store your bytes sequence, which in case of Cascalog is in a form of string of some kind; since at the moment we use both JDBC Taps and plain CSV files, without diving too deep, we simply encode it with
base64 and throw it is as an UTF-8 string; so the whole cascalog query will look like this:
1 (c/defmapop stringify [hll-object] 2 [(Base64/encodeBase64String (.getBytes hll-object))]) 3 4 (defn get-day-n-year [epoch-time] 5 (let [ epoch-time-long (Long/parseLong epoch-time) 6 in-millis (* epoch-time-long 1000) 7 date (time2/from-long in-millis)] 8 [(.getDayOfYear date) (.getYear date)])) 9 10 (defn count-gvb-passengers [path] 11 (let [ov-source (ov-source path)] 12 (c/<- [?city ?district ?day ?year ?cardinality ?base64-hll] 13 14 (:trap (c/hfs-textline "/tmp/hll-demo-errors" :sinkmode :replace )) 15 16 (ov-source :>> ov-fields) 17 18 (hll/sum ?uuid :> ?hll) 19 (cardinality ?hll :> ?cardinality) 20 (get-day-n-year ?timestamp :> ?day ?year) 21 (hll/stringify ?hll :> ?base64-hll))))
Note that we throw in day and year parsing.
Now we have nice rows grouped by
(district, day) key with the amount of passengers seen that day, which can be cheaply merged in the runtime (whenever you request statistics through any online tooling).
And, of course, here is the full gist, of the code I used to demo HyperLogLog with Cascalog in this article; in order to run:
$ git clone https://gist.github.com/7319327.git $ cd cd ./7319327/ $ lein demo ./ov-chipkaart-accesslogs.txt
It will produce output like:
Table 4. Demo output