I've been back in the UK this month, and am busy working on Hoya. The key activity right now is adding support for Apache Accumulo. This is because it forces me to make the changes sooner rather than later to go multi-application; to make Hoya more generic.
This means I've had to go from a simple model of (master, worker+) to a multi-role world one of (master, tablets+, [monitor], [gc]).
A two-role model is straightforward: the AM forks off the master, while all the other containers are automatically workers. Compare the no. of containers you have to the desired number, and request/release containers. When you get given containers, or one fails, you know its always a worker, so update the numbers. Flexing is achieved by having the client specify a new count of workers. Simple.
Multi-role means that not only do you have to have more complex data structures, you need to map containers and container requests to specific roles. That means when the RM notifies the AM of some new container assignments, the AM has to decide what they are for. The trick there -as suggested by one of my colleagues who I shall not name- is to use the priority:int field in the request, mapping that to role numbers. The AM then needs to spin up the relevant launcher for that role, and record details about the container so that on any failure notification it knows which role's node count to decrease. The AM then needs to scan all the role set to decide whether the current state is good, and how to react if not. There's also the detail of configuration; you have to move from simple options for, say, --workers 4 --workerheap 512M, to per role numbers and options: --role worker 4 --roleopt worker jvm.heapsize 512M -role monitor 1 --roleopt monitor jvm.heapsize 1G
This gets turned into a set of role option maps, which have to be used to configure the specific roles.
I've also had to change how the client flexed the cluster size -from a simple int value to a whole new JSON cluster specification. Which leads to an interesting trick: you can not only tune the size of a cluster by role -you can change parameters like heapsize as you go.
Multi-role then: lots more complexity in managing things.
The other bit of complexity is of course deploying different application types. Here I've introduced the notion of a Provider; something that provides client-side setup services: build initial json spec, preflight checking of specifications, helping to set up the AM with extra resources, and patching configuration directories.
Server side, these providers now need to help manage the roles, with in-VM startup of the master process, and launcher thread startup of all the remote processes. This is still ongoing work. I did have accumulo execing, though its startup script needs to take the env variables of HADOOP_PREFIX and ZOOKEEPER_PREFIX as a preamble to its own classloading, which is something I may have to bypass before long on the basis it is trying to be too clever. But bring up the Accumulo master and the first thing it says is "init me". I may patch accumulo to add a special "--init" option here, but to keep the #of changes down, and support similar use cases, I've instead gone from a single executable to the ability to run a series of programs in the AMs container. And how have I ended up implementing that? With YARN services.
I've added a new YARN service, ForkedProcessService, which executes a native process when started, stopping itself when that process finishes -and converting a non-zero exit code into service failure with an ExitException with the relevant exit code. Any YARN service that can host children -of which the Hoya AM is one- can create one of these services, add it as a child, configure it and start it running. Registering as a service lifecycle listener lets this parent service get told when the child has finished, and can react to it failing and succeeding.
That works for HBase, with one process "bin/hbase master", but not for Accumulo's potential workflow of [accumulo init, accumulo master]. For that I've had to do another service, SequenceService, which runs its children in sequential order, only starting one when the previous one finishes successfully. Then I have my Provider Services extend that, to get the ability to exec a sequence of commands.
List<String> commands =
buildProcessCommand(cd, confDir, env, masterCommand);
ForkedProcessService masterProcess = buildProcess(getName(), env, commands);
CompoundService compound = new CompoundService(getName());
The AM can now deploy a provider's in-AM role by telling the provider service to start() and let it sort the details out for itself. Which is pretty slick: it's services through Hoya, out-of-VM processes for the real work. To achieve this I've had to add two new container services, SequenceService: runs a list of services in order, failing when any of the child services file, stopping when they are all done. CompoundService runs a set of services in parallel, again propagating failures. Once all children have stopped, this parent service does too.
Comparing it to SmartFrog's workflow components, there are obvious similarities, though without the issues that component always had -because child services were potentially remote and only accessible via RMI, you couldn't interact with a stopped service, while the deployment model mostly mandated that everything was done declaratively at deploy time. In Hoya all the wiring up of services is done in-code; there's no attempt to be more declarative, and with every service running in-process, keeping an eye on them is much easier. That said, I'm having to do things like pass in callback interfaces -the execInProgress interface is implemented by the AM to let it know that the master process is going live. in SF I'd have stuck a reference to an exported RMI interface into the distributed component description graph, and let this workflow service pull it down. Still, here we avoid RMI -which is not a bad thing.
The other difference is that I am not trying to run any of these service components on any node in the cluster other than the single HoyaApplication Master. The goal is to only run on each allocated container the final program Hoya is managing, not any Hoya code at all. YARN does the work of getting the binaries over, starting the process and reporting failure. That assumption of a failure-aware execution layer atop a shared filesystem, with Zookeeper to provide consistent distributed state, means that I can delegate all those details to Hadoop.
Furthermore, I'm only trying to deploy applications that live in a Hadoop cluster -use HDFS for persistent state over a local filesystem, dynamically locate other parts of the application via ZK, and don't treat unannounced termination of a process or loss of a node as a disaster, more a mild inconvenience. This is precisely what today's classic Enterprise Java & Spring Server apps don't do: they use databases, message queues and often build-time URLs -if not build time, then certainly static for the lifespan of the program.
Handling those applications is a pain, because your deployment framework has to choreograph startup, wiring together distributed components of the system from dynamically built up hostnames and URLs of other component locations, try to execute the (expected) shutdown routines rather than just kill the processes, and, hardest of all, deal with the persistent data problem. Usually that's addressed by having a shared database, with the MQ to route stuff to services
Testing YARN applications
One thing I should discuss is the challenge of testing YARN applications, which I've now documented a bit on the wiki
- MiniYARNCluster is your friend
- MiniHDFS cluster loses logs when torn down, but is handy to keep you honest -make sure some tests use it for YARN deployment
- Linux VMs are more relalistic than an OS/X desktop, though there is the overhead of getting protoc updated. Once you've done it -snapshot it.
I have thought of how to merge in the log output of the Hoya JVMs running in YARN containers. These JVMs generate data using the commons-logging API (Hadoop-internal) and SLF4J (Hoya), both of which are backed by Log4J. All I need to do is get that Log4J data merged with the output of the Junit JVM.
Output all log4j records in the JSON format our little JSON log emitter then merge the VM results back in with the JUnit JVM. The clocks on all the logs will be consistent (it's the same host/VM), so they can be strictly ordered (I often end up using timestamps to compare what's going on in the RM & Test runner with the AM). The output would then need to be reworked so that the log entries are presented in a human readable format, instead of one JSON record per (long) line, with a process identifier included on each line.
Stream live to the test runner
Use a Log4J back end that forwards log events to a remote process, perhaps the test runner JVM itself. The collector would then output the Log events intermixed with the local events. Again, we need to differentiate the processes.
Back when I wrote the SmartFrog distributed xUnit test runner (JUnit3, SFUnit, ...), we did forward test results over the wire to a collector service, which could then aggregate test results from processes - similar to the work in [Duarte06] . In this design we were running tests against multiple remote implementations, showing the results per test, rather than per implementation, so highlighting where things behaved differently. We also streamed out the results as XHTML, emitting the summary record at the end, rather than trying to buffer everything in the JVM until the end of the run. That in-JVM buffering is needed so as the classic Ant XML format sticks the summary in the attributes of the XML document. As a result, if the JVM dies: no output. If the test run generates too much log data: OOM and no output. Streaming XHTML means that you don't OOM from lots of log data, and if the process dies, your browser can still make a lot of the details. Oh, and with the right style sheet the results were directly viewable in the browser without the overhead of the XSL transform (though that always helps with summary data).
What, then, would I like? But which I don't have time to implement.
- Test runner to collect log events from multiple processes
- Test runner to stream these events out to a file, preserving log level and adding host information
- Test runner to print to stdout some feedback on what is going on so that someone sitting in front of a screen can see what is happening. This should be in a format for humans
- Something to take the streamed output and generate the traditional XML format for the benefit of Jenkins and other CI tools
- Something to take that streamed output and generate reports that can intermix the log events from different processes, and, by virtue of the retained log level data, let me show/hide different log levels, so that I can say "show the warnings", leaving info and debug hidden.
Like I said: no time. But maybe next year I'll mentor a Google Summer of Code project to do this -it's a great cross-Apache bit of work, with Jenkins and possibly Junit & TestNG in the mix. Target Hadoop, Bigtop and other bits of the Hadoop stack. Yes, a nice project: volunteers with free time invited.
[Photo: somewhere in SF's Mission District: like Stokes Croft except with high-end furniture shops and the graffiti is mostly on the side streets, rather than integrated with the shops]