I've just committed HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints.. Contributed by Steve Loughran and Ryan Blue.
This is a serious and complex piece of work; I need to thank:
- Thomas Demoor and Ewan Higgs from WDC for their advice and testing. They understand the intricacies of the S3 protocol to the millimetre.
- Ryan Blue for his Staging-based S3 committer. The core algorithms and code will be in hadoop-aws come Hadoop 3.1.
- Colleagues for their support, including the illustrious Sanjay Radia, and Ram Venkatesh for letting me put so much time into this.
- Reviewers, especially Ryan Blue, Ewan Higgs, Mingliang Liu and extra especially Aaron Fabbri @ cloudera. It's a big piece of code to learn. First time a patch of mine has ever crossed the 1MB source barrier
- The two different algorithms, v1 and v2 have very different semantics about the atomicity of task and job commits, including when output becomes visible in the destination directory.
- Neither algorithm is atomic in both task and job commit.
- V1 is atomic in task commits, but O(files) in its non-atomic job commit. It can recover from any job failure without having rerun all succeeded tasks, but not from a failure in job commit.
- V2's job commit is a repeatable atomic O(1) operation, because it is a no-op. Task commits do the move/merge, which are O(file), make the output immediately visible, and as a consequence, mean that failure of a job means the output directory is in an unknown state.
- Both algorithms depend on the filesystem having consistent listings and Create/Update/Delete operations
- The routine to merge the output of a task to the destination is a real-world example of a co-recursive algorithm. These are so rare most developers don't even know the term for them -or have forgotten it.
- At-most-once execution is guaranteed by having the tasks and AM failing when they recognise that they are in trouble.
- The App Master refuses to commit a job if it hasn't had a heartbeat with the YARN Resource Manager within a specific time period. This stops it committing work if the network is partitioned and the AM/RM protocol fails...YARN may have considered the job dead and restarted it.
- tasks commit iff they get permission from the AM; thus they will not attempt to commit if the network partitions.
- if a task given permission to commit does not report a successful commit to the AM; the V1 algorithm can rerun the task; v2 must conclude its in an unknown state and abort the job.
- Spark can commit using the Hadoop FileOutputCommitter; its Parquet support has some "special" code which refuses to work if the committer is not a subclass of ParquetOutputCommitter . That is: it's special code makes it the hardest thing to bind to this: ORC, CSV, Avro: they all work out the box.
- It adds the ability for tasks to provide extra data to its job driver for use in job commit; this allows committers to explicitly pass commit information directly to the driver, rather than indirectly via the (consistent) filesystem.
- Everyone's code assumes that abort() completes in a bounded time, and does not ever throw that IOException its signature promises it can.
- There's lots of cruft in the MRv2 codebase to keep the MRv1 code alive, which would be really good to delete
If we had some TLA+ specifications of filesystems and object stores, we could perhaps write the algorithms as PlusCal examples, but that needs someone with the skills and the time. I'd have to find the time to learn TLA+ properly as well as specify everything, so it won't be me.
Returning to the committers, what do they do which is so special?
They upload task output to the final destination paths in the tasks, but don't make the uploads visible until the job is committed.
No renames, no copies, no job-commit-time merges, and no data visible until job commit. Tasks which fail/fail to commit do not have any adverse side effects on the destination directories.
First, read S3A Committers: Architecture and Implementation.
Then, if that seems interesting look at the source.
A key feature is that we've snuck in to FileOutputFormat a mechanism to allow you to provide different committers for different filesystem schemas.
Normal file output formats (i.e. not Parquet) will automatically get the committer for the target filesystems, which, for S3A, can be changed from the default FileOutputCommitter to an S3A-specific one. And any other object store which also offers delayed materialization of uploaded data can implement their own and run it alongside the S3A ones, which will be something to keep the Azure, GCS and openstack teams busy, perhaps.
For now though: users of Hadoop can use Amazon S3 (or compatible services) as the direct destination of Hadoop and Spark workloads without any overheads of copying the data, and the ability to support failure recovery and speculative execution. I'm happy with that as a good first step.
(photo: street vendors at the Kenya/Tanzania Border)