|
frozen
1.0
|
Then we try to process huge amount of data it nearly always results in building complex and hardly maintainable systems with all sort of tools in it: starting with split + rsync files between computers and ending with hardcore scripts on all possible languages. Recently some problems were solved, including appearance of ZeroMQ which really helps. But you still can not jump on to problem and solve it quickly, it always require to spend some time to prepare, build blocks and only then solve actual problems. Frozen try to fill this gap and provide these blocks. Lets see how we can build something distributed.
Lets took something simple and move to more complex things. Assume we have huge file with data represented as lines in it. We want to split it in several groups and for each group we have own special handler.
To read data we use following config:
{ class = "thread" },
{ class = "query", data = (file_t){ filename = "/var/log/messages", readonly = (uint_t)"1" }, request = {
action = (action_t)"transfer",
destination = (machine_t){
// split file chunks to lines
{ class = "split" },
// send to next process
{ class = "modules/zeromq", convert = (uint_t)"1", socket = (zeromq_t){
type = "push",
connect = "tcp://127.0.0.1:8800"
}},
{ class = "end" }
}
}},
{ class = "kill" }
We just read data from file, next we split it in lines and send using zeromq to workers pool. We do not bother with doing it fast right here because we do not need to. If you have huge data, you possibly have huge cluster to process it, and very likely you have distributed fs on it. Every cluster configuration have it's own advantages and disadvantages. Maybe you have several data storage servers and they are very good at reading speed, but maybe you have common hardware with low read speed on hdd. In any case you should consider best approach for your cluster and use it. We can start as many readers with as many files as we want, because zeromq allow us to join this flows to be merged into one.
At first, we create dummy worker, it only print our data and we ensure that everything is ok.
{ class = "thread", loop = (uint_t)"1" },
// get message
{ class = "modules/zeromq", socket = (zeromq_t){
type = "pull",
bind = "tcp://127.0.0.1:8800"
}},
// print to console
{ class = "query", data = (fd_t)"stdout", request = { action = (action_t)"write", buffer = (env_t)"buffer" } },
// send to next process
{ class = "modules/zeromq", socket = (zeromq_t){
type = "push",
connect = "tcp://127.0.0.1:8801"
}},
{ class = "end" }
Next step is to decide - which data where to go. Lets to in straightforward for now with data/regexp.
{ class = "thread", loop = (uint_t)"1" },
// get message
{ class = "modules/zeromq", socket = (zeromq_t){
type = "pull",
bind = "tcp://127.0.0.1:8800"
}},
// process only messages with "dhcp" in it
{ class = "regexp", regexp = "dhcp" },
{ class = "switch", rules = {
{
request = { marker = (uint_t)"1" },
machine = (machine_t){
// print to console
{ class = "query", data = (fd_t)"stdout", request = { action = (action_t)"write", buffer = (env_t)"buffer" } },
// send to next process
{ class = "modules/zeromq", socket = (zeromq_t){
type = "push",
connect = "tcp://127.0.0.1:8801"
}},
{ class = "end" }
}
}
}},
{ class = "end" }
Here we grep data with "dhcp" in it and send to another process. See both data/regexp, request/switch and Tutorial: most used modules for more complex examples.
After processing we want to save results, lets save it to simple file.
{ class = "thread", loop = (uint_t)"1" },
{ class = "modules/zeromq", socket = (zeromq_t){
type = "pull",
bind = "tcp://127.0.0.1:8801"
}},
{ class = "query",
data = (slider_t){
data = (file_t){ filename = "result.dat" }
},
request = {
action = (action_t)"write",
buffer = (env_t)"buffer"
}
},
{ class = "end" }
As you can notice, all zeromq adresses is local, so we definitely need to start it on local machine, but it will work on cluster when you change addresses to correct one.
$ frozend -c examples/article_dist_destination.m4
$ frozend -c examples/article_dist_worker_reg.m4
$ frozend -c examples/article_dist_source.m4
Then source finish reading, we could see in worker console grepped data and destination worker wrote data to file too.
That was easy, but not so useful. Here how we can improve things: