anon-527965's picture
From anon-527965 rss RSS 

Oreilly.Hadoop.The.Definitive.Guide.2nd.Edition.Oct.201

 

 
 
Tags:  tomcat 
Views:  214
Downloads:  3
Published:  November 25, 2011
 
0
download

Share plick with friends Share
save to favorite
Report Abuse Report Abuse
 
Related Plicks
Tomcat apache setting

Tomcat apache setting

From: emily
Views: 7803 Comments: 1

 
Integration of complex external applications with multi-site CMS implementations: MRM

Integration of complex external applications with multi-site CMS implementations: MRM

From: alvinl
Views: 36 Comments: 0

 
CAT-180

CAT-180

From: exam1pass12
Views: 47 Comments: 0

 
HP2-N26

HP2-N26

From: xiaoxiaoli967
Views: 32 Comments: 0

 
See all 
 
More from this user
No more plicks from this user
 
 
 URL:          AddThis Social Bookmark Button
Embed Thin Player: (fits in most blogs)
Embed Full Player :
 
 

Name

Email (will NOT be shown to other users)

 

 
 
Comments: (watch)
 
 
Notes:
 
Slide 3: SECOND EDITION Hadoop: The Definitive Guide Tom White foreword by Doug Cutting Beijing • Cambridge • Farnham • Köln • Sebastopol • Tokyo
Slide 4: Hadoop: The Definitive Guide, Second Edition by Tom White Copyright © 2011 Tom White. All rights reserved. Printed in the United States of America. Published by O’Reilly Media, Inc., 1005 Gravenstein Highway North, Sebastopol, CA 95472. O’Reilly books may be purchased for educational, business, or sales promotional use. Online editions are also available for most titles (http://my.safaribooksonline.com). For more information, contact our corporate/institutional sales department: (800) 998-9938 or corporate@oreilly.com. Editor: Mike Loukides Production Editor: Adam Zaremba Proofreader: Diane Il Grande Printing History: June 2009: October 2010: First Edition. Second Edition. Indexer: Jay Book Services Cover Designer: Karen Montgomery Interior Designer: David Futato Illustrator: Robert Romano Nutshell Handbook, the Nutshell Handbook logo, and the O’Reilly logo are registered trademarks of O’Reilly Media, Inc. Hadoop: The Definitive Guide, the image of an African elephant, and related trade dress are trademarks of O’Reilly Media, Inc. Many of the designations used by manufacturers and sellers to distinguish their products are claimed as trademarks. Where those designations appear in this book, and O’Reilly Media, Inc. was aware of a trademark claim, the designations have been printed in caps or initial caps. While every precaution has been taken in the preparation of this book, the publisher and author assume no responsibility for errors or omissions, or for damages resulting from the use of the information contained herein. ISBN: 978-1-449-38973-4 [SB] 1285179414
Slide 5: For Eliane, Emilia, and Lottie
Slide 7: Table of Contents Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xv Preface . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xvii 1. Meet Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1 Data! Data Storage and Analysis Comparison with Other Systems RDBMS Grid Computing Volunteer Computing A Brief History of Hadoop Apache Hadoop and the Hadoop Ecosystem 1 3 4 4 6 8 9 12 2. MapReduce . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 15 A Weather Dataset Data Format Analyzing the Data with Unix Tools Analyzing the Data with Hadoop Map and Reduce Java MapReduce Scaling Out Data Flow Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python Hadoop Pipes Compiling and Running 15 15 17 18 18 20 27 28 30 33 33 33 36 37 38 v
Slide 8: 3. The Hadoop Distributed Filesystem . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41 The Design of HDFS HDFS Concepts Blocks Namenodes and Datanodes The Command-Line Interface Basic Filesystem Operations Hadoop Filesystems Interfaces The Java Interface Reading Data from a Hadoop URL Reading Data Using the FileSystem API Writing Data Directories Querying the Filesystem Deleting Data Data Flow Anatomy of a File Read Anatomy of a File Write Coherency Model Parallel Copying with distcp Keeping an HDFS Cluster Balanced Hadoop Archives Using Hadoop Archives Limitations 41 43 43 44 45 46 47 49 51 51 52 55 57 57 62 62 62 65 68 70 71 71 72 73 4. Hadoop I/O . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 75 Data Integrity Data Integrity in HDFS LocalFileSystem ChecksumFileSystem Compression Codecs Compression and Input Splits Using Compression in MapReduce Serialization The Writable Interface Writable Classes Implementing a Custom Writable Serialization Frameworks Avro File-Based Data Structures SequenceFile vi | Table of Contents 75 75 76 77 77 78 83 84 86 87 89 96 101 103 116 116
Slide 9: MapFile 123 5. Developing a MapReduce Application . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 129 The Configuration API Combining Resources Variable Expansion Configuring the Development Environment Managing Configuration GenericOptionsParser, Tool, and ToolRunner Writing a Unit Test Mapper Reducer Running Locally on Test Data Running a Job in a Local Job Runner Testing the Driver Running on a Cluster Packaging Launching a Job The MapReduce Web UI Retrieving the Results Debugging a Job Using a Remote Debugger Tuning a Job Profiling Tasks MapReduce Workflows Decomposing a Problem into MapReduce Jobs Running Dependent Jobs 130 131 132 132 132 135 138 138 140 141 141 145 146 146 146 148 151 153 158 160 160 163 163 165 6. How MapReduce Works . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 167 Anatomy of a MapReduce Job Run Job Submission Job Initialization Task Assignment Task Execution Progress and Status Updates Job Completion Failures Task Failure Tasktracker Failure Jobtracker Failure Job Scheduling The Fair Scheduler The Capacity Scheduler 167 167 169 169 170 170 172 173 173 175 175 175 176 177 Table of Contents | vii
Slide 10: Shuffle and Sort The Map Side The Reduce Side Configuration Tuning Task Execution Speculative Execution Task JVM Reuse Skipping Bad Records The Task Execution Environment 177 177 179 180 183 183 184 185 186 7. MapReduce Types and Formats . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 189 MapReduce Types The Default MapReduce Job Input Formats Input Splits and Records Text Input Binary Input Multiple Inputs Database Input (and Output) Output Formats Text Output Binary Output Multiple Outputs Lazy Output Database Output 189 191 198 198 209 213 214 215 215 216 216 217 224 224 8. MapReduce Features . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 225 Counters Built-in Counters User-Defined Java Counters User-Defined Streaming Counters Sorting Preparation Partial Sort Total Sort Secondary Sort Joins Map-Side Joins Reduce-Side Joins Side Data Distribution Using the Job Configuration Distributed Cache MapReduce Library Classes viii | Table of Contents 225 225 227 232 232 232 233 237 241 247 247 249 252 252 253 257
Slide 11: 9. Setting Up a Hadoop Cluster . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 259 Cluster Specification Network Topology Cluster Setup and Installation Installing Java Creating a Hadoop User Installing Hadoop Testing the Installation SSH Configuration Hadoop Configuration Configuration Management Environment Settings Important Hadoop Daemon Properties Hadoop Daemon Addresses and Ports Other Hadoop Properties User Account Creation Security Kerberos and Hadoop Delegation Tokens Other Security Enhancements Benchmarking a Hadoop Cluster Hadoop Benchmarks User Jobs Hadoop in the Cloud Hadoop on Amazon EC2 259 261 263 264 264 264 265 265 266 267 269 273 278 279 280 281 282 284 285 286 287 289 289 290 10. Administering Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 293 HDFS Persistent Data Structures Safe Mode Audit Logging Tools Monitoring Logging Metrics Java Management Extensions Maintenance Routine Administration Procedures Commissioning and Decommissioning Nodes Upgrades 293 293 298 300 300 305 305 306 309 312 312 313 316 11. Pig . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 321 Installing and Running Pig 322 Table of Contents | ix
Slide 12: Execution Types Running Pig Programs Grunt Pig Latin Editors An Example Generating Examples Comparison with Databases Pig Latin Structure Statements Expressions Types Schemas Functions User-Defined Functions A Filter UDF An Eval UDF A Load UDF Data Processing Operators Loading and Storing Data Filtering Data Grouping and Joining Data Sorting Data Combining and Splitting Data Pig in Practice Parallelism Parameter Substitution 322 324 324 325 325 327 328 330 330 331 335 336 338 342 343 343 347 348 351 351 352 354 359 360 361 361 362 12. Hive . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 365 Installing Hive The Hive Shell An Example Running Hive Configuring Hive Hive Services The Metastore Comparison with Traditional Databases Schema on Read Versus Schema on Write Updates, Transactions, and Indexes HiveQL Data Types Operators and Functions Tables 366 367 368 369 369 371 373 375 376 376 377 378 380 381 x | Table of Contents
Slide 13: Managed Tables and External Tables Partitions and Buckets Storage Formats Importing Data Altering Tables Dropping Tables Querying Data Sorting and Aggregating MapReduce Scripts Joins Subqueries Views User-Defined Functions Writing a UDF Writing a UDAF 381 383 387 392 394 395 395 395 396 397 400 401 402 403 405 13. HBase . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 411 HBasics Backdrop Concepts Whirlwind Tour of the Data Model Implementation Installation Test Drive Clients Java Avro, REST, and Thrift Example Schemas Loading Data Web Queries HBase Versus RDBMS Successful Service HBase Use Case: HBase at Streamy.com Praxis Versions HDFS UI Metrics Schema Design Counters Bulk Load 411 412 412 412 413 416 417 419 419 422 423 424 425 428 431 432 433 433 435 435 436 437 437 438 438 439 Table of Contents | xi
Slide 14: 14. ZooKeeper . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 441 Installing and Running ZooKeeper An Example Group Membership in ZooKeeper Creating the Group Joining a Group Listing Members in a Group Deleting a Group The ZooKeeper Service Data Model Operations Implementation Consistency Sessions States Building Applications with ZooKeeper A Configuration Service The Resilient ZooKeeper Application A Lock Service More Distributed Data Structures and Protocols ZooKeeper in Production Resilience and Performance Configuration 442 443 444 444 447 448 450 451 451 453 457 458 460 462 463 463 466 470 472 473 473 474 15. Sqoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 477 Getting Sqoop A Sample Import Generated Code Additional Serialization Systems Database Imports: A Deeper Look Controlling the Import Imports and Consistency Direct-mode Imports Working with Imported Data Imported Data and Hive Importing Large Objects Performing an Export Exports: A Deeper Look Exports and Transactionality Exports and SequenceFiles 477 479 482 482 483 485 485 485 486 487 489 491 493 494 494 16. Case Studies . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 497 Hadoop Usage at Last.fm xii | Table of Contents 497
Slide 15: Last.fm: The Social Music Revolution Hadoop at Last.fm Generating Charts with Hadoop The Track Statistics Program Summary Hadoop and Hive at Facebook Introduction Hadoop at Facebook Hypothetical Use Case Studies Hive Problems and Future Work Nutch Search Engine Background Data Structures Selected Examples of Hadoop Data Processing in Nutch Summary Log Processing at Rackspace Requirements/The Problem Brief History Choosing Hadoop Collection and Storage MapReduce for Logs Cascading Fields, Tuples, and Pipes Operations Taps, Schemes, and Flows Cascading in Practice Flexibility Hadoop and Cascading at ShareThis Summary TeraByte Sort on Apache Hadoop Using Pig and Wukong to Explore Billion-edge Network Graphs Measuring Community Everybody’s Talkin’ at Me: The Twitter Reply Graph Symmetric Links Community Extraction 497 497 498 499 506 506 506 506 509 512 516 517 517 518 521 530 531 531 532 532 532 533 539 540 542 544 545 548 549 552 553 556 558 558 561 562 A. Installing Apache Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 565 B. Cloudera’s Distribution for Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 571 C. Preparing the NCDC Weather Data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 573 Table of Contents | xiii
Slide 16: Index . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 577 xiv | Table of Contents
Slide 17: Foreword Hadoop got its start in Nutch. A few of us were attempting to build an open source web search engine and having trouble managing computations running on even a handful of computers. Once Google published its GFS and MapReduce papers, the route became clear. They’d devised systems to solve precisely the problems we were having with Nutch. So we started, two of us, half-time, to try to re-create these systems as a part of Nutch. We managed to get Nutch limping along on 20 machines, but it soon became clear that to handle the Web’s massive scale, we’d need to run it on thousands of machines and, moreover, that the job was bigger than two half-time developers could handle. Around that time, Yahoo! got interested, and quickly put together a team that I joined. We split off the distributed computing part of Nutch, naming it Hadoop. With the help of Yahoo!, Hadoop soon grew into a technology that could truly scale to the Web. In 2006, Tom White started contributing to Hadoop. I already knew Tom through an excellent article he’d written about Nutch, so I knew he could present complex ideas in clear prose. I soon learned that he could also develop software that was as pleasant to read as his prose. From the beginning, Tom’s contributions to Hadoop showed his concern for users and for the project. Unlike most open source contributors, Tom is not primarily interested in tweaking the system to better meet his own needs, but rather in making it easier for anyone to use. Initially, Tom specialized in making Hadoop run well on Amazon’s EC2 and S3 services. Then he moved on to tackle a wide variety of problems, including improving the MapReduce APIs, enhancing the website, and devising an object serialization framework. In all cases, Tom presented his ideas precisely. In short order, Tom earned the role of Hadoop committer and soon thereafter became a member of the Hadoop Project Management Committee. Tom is now a respected senior member of the Hadoop developer community. Though he’s an expert in many technical corners of the project, his specialty is making Hadoop easier to use and understand. xv
Slide 18: Given this, I was very pleased when I learned that Tom intended to write a book about Hadoop. Who could be better qualified? Now you have the opportunity to learn about Hadoop from a master—not only of the technology, but also of common sense and plain talk. —Doug Cutting Shed in the Yard, California xvi | Foreword
Slide 19: Preface Martin Gardner, the mathematics and science writer, once said in an interview: Beyond calculus, I am lost. That was the secret of my column’s success. It took me so long to understand what I was writing about that I knew how to write in a way most readers would understand.* In many ways, this is how I feel about Hadoop. Its inner workings are complex, resting as they do on a mixture of distributed systems theory, practical engineering, and common sense. And to the uninitiated, Hadoop can appear alien. But it doesn’t need to be like this. Stripped to its core, the tools that Hadoop provides for building distributed systems—for data storage, data analysis, and coordination— are simple. If there’s a common theme, it is about raising the level of abstraction—to create building blocks for programmers who just happen to have lots of data to store, or lots of data to analyze, or lots of machines to coordinate, and who don’t have the time, the skill, or the inclination to become distributed systems experts to build the infrastructure to handle it. With such a simple and generally applicable feature set, it seemed obvious to me when I started using it that Hadoop deserved to be widely used. However, at the time (in early 2006), setting up, configuring, and writing programs to use Hadoop was an art. Things have certainly improved since then: there is more documentation, there are more examples, and there are thriving mailing lists to go to when you have questions. And yet the biggest hurdle for newcomers is understanding what this technology is capable of, where it excels, and how to use it. That is why I wrote this book. The Apache Hadoop community has come a long way. Over the course of three years, the Hadoop project has blossomed and spun off half a dozen subprojects. In this time, the software has made great leaps in performance, reliability, scalability, and manageability. To gain even wider adoption, however, I believe we need to make Hadoop even easier to use. This will involve writing more tools; integrating with more systems; and * “The science of fun,” Alex Bellos, The Guardian, May 31, 2008, http://www.guardian.co.uk/science/ 2008/may/31/maths.science. xvii
Slide 20: writing new, improved APIs. I’m looking forward to being a part of this, and I hope this book will encourage and enable others to do so, too. Administrative Notes During discussion of a particular Java class in the text, I often omit its package name, to reduce clutter. If you need to know which package a class is in, you can easily look it up in Hadoop’s Java API documentation for the relevant subproject, linked to from the Apache Hadoop home page at http://hadoop.apache.org/. Or if you’re using an IDE, it can help using its auto-complete mechanism. Similarly, although it deviates from usual style guidelines, program listings that import multiple classes from the same package may use the asterisk wildcard character to save space (for example: import org.apache.hadoop.io.*). The sample programs in this book are available for download from the website that accompanies this book: http://www.hadoopbook.com/. You will also find instructions there for obtaining the datasets that are used in examples throughout the book, as well as further notes for running the programs in the book, and links to updates, additional resources, and my blog. What’s in This Book? The rest of this book is organized as follows. Chapter 1 emphasizes the need for Hadoop and sketches the history of the project. Chapter 2 provides an introduction to MapReduce. Chapter 3 looks at Hadoop filesystems, and in particular HDFS, in depth. Chapter 4 covers the fundamentals of I/O in Hadoop: data integrity, compression, serialization, and file-based data structures. The next four chapters cover MapReduce in depth. Chapter 5 goes through the practical steps needed to develop a MapReduce application. Chapter 6 looks at how MapReduce is implemented in Hadoop, from the point of view of a user. Chapter 7 is about the MapReduce programming model, and the various data formats that MapReduce can work with. Chapter 8 is on advanced MapReduce topics, including sorting and joining data. Chapters 9 and 10 are for Hadoop administrators, and describe how to set up and maintain a Hadoop cluster running HDFS and MapReduce. Later chapters are dedicated to projects that build on Hadoop or are related to it. Chapters 11 and 12 present Pig and Hive, which are analytics platforms built on HDFS and MapReduce, whereas Chapters 13, 14, and 15 cover HBase, ZooKeeper, and Sqoop, respectively. Finally, Chapter 16 is a collection of case studies contributed by members of the Apache Hadoop community. xviii | Preface
Slide 21: What’s New in the Second Edition? The second edition has two new chapters on Hive and Sqoop (Chapters 12 and 15), a new section covering Avro (in Chapter 4), an introduction to the new security features in Hadoop (in Chapter 9), and a new case study on analyzing massive network graphs using Hadoop (in Chapter 16). This edition continues to describe the 0.20 release series of Apache Hadoop, since this was the latest stable release at the time of writing. New features from later releases are occasionally mentioned in the text, however, with reference to the version that they were introduced in. Conventions Used in This Book The following typographical conventions are used in this book: Italic Indicates new terms, URLs, email addresses, filenames, and file extensions. Constant width Used for program listings, as well as within paragraphs to refer to program elements such as variable or function names, databases, data types, environment variables, statements, and keywords. Constant width bold Shows commands or other text that should be typed literally by the user. Constant width italic Shows text that should be replaced with user-supplied values or by values determined by context. This icon signifies a tip, suggestion, or general note. This icon indicates a warning or caution. Using Code Examples This book is here to help you get your job done. In general, you may use the code in this book in your programs and documentation. You do not need to contact us for permission unless you’re reproducing a significant portion of the code. For example, writing a program that uses several chunks of code from this book does not require permission. Selling or distributing a CD-ROM of examples from O’Reilly books does Preface | xix
Slide 22: require permission. Answering a question by citing this book and quoting example code does not require permission. Incorporating a significant amount of example code from this book into your product’s documentation does require permission. We appreciate, but do not require, attribution. An attribution usually includes the title, author, publisher, and ISBN. For example: “Hadoop: The Definitive Guide, Second Edition, by Tom White. Copyright 2011 Tom White, 978-1-449-38973-4.” If you feel your use of code examples falls outside fair use or the permission given above, feel free to contact us at permissions@oreilly.com. Safari® Books Online Safari Books Online is an on-demand digital library that lets you easily search over 7,500 technology and creative reference books and videos to find the answers you need quickly. With a subscription, you can read any page and watch any video from our library online. Read books on your cell phone and mobile devices. Access new titles before they are available for print, and get exclusive access to manuscripts in development and post feedback for the authors. Copy and paste code samples, organize your favorites, download chapters, bookmark key sections, create notes, print out pages, and benefit from tons of other time-saving features. O’Reilly Media has uploaded this book to the Safari Books Online service. To have full digital access to this book and others on similar topics from O’Reilly and other publishers, sign up for free at http://my.safaribooksonline.com. How to Contact Us Please address comments and questions concerning this book to the publisher: O’Reilly Media, Inc. 1005 Gravenstein Highway North Sebastopol, CA 95472 800-998-9938 (in the United States or Canada) 707-829-0515 (international or local) 707-829-0104 (fax) We have a web page for this book, where we list errata, examples, and any additional information. You can access this page at: http://oreilly.com/catalog/0636920010388/ The author also has a site for this book at: http://www.hadoopbook.com/ xx | Preface
Slide 23: To comment or ask technical questions about this book, send email to: bookquestions@oreilly.com For more information about our books, conferences, Resource Centers, and the O’Reilly Network, see our website at: http://www.oreilly.com Acknowledgments I have relied on many people, both directly and indirectly, in writing this book. I would like to thank the Hadoop community, from whom I have learned, and continue to learn, a great deal. In particular, I would like to thank Michael Stack and Jonathan Gray for writing the chapter on HBase. Also thanks go to Adrian Woodhead, Marc de Palol, Joydeep Sen Sarma, Ashish Thusoo, Andrzej Białecki, Stu Hood, Chris K. Wensel, and Owen O’Malley for contributing case studies for Chapter 16. I would like to thank the following reviewers who contributed many helpful suggestions and improvements to my drafts: Raghu Angadi, Matt Biddulph, Christophe Bisciglia, Ryan Cox, Devaraj Das, Alex Dorman, Chris Douglas, Alan Gates, Lars George, Patrick Hunt, Aaron Kimball, Peter Krey, Hairong Kuang, Simon Maxen, Olga Natkovich, Benjamin Reed, Konstantin Shvachko, Allen Wittenauer, Matei Zaharia, and Philip Zeyliger. Ajay Anand kept the review process flowing smoothly. Philip (“flip”) Kromer kindly helped me with the NCDC weather dataset featured in the examples in this book. Special thanks to Owen O’Malley and Arun C. Murthy for explaining the intricacies of the MapReduce shuffle to me. Any errors that remain are, of course, to be laid at my door. For the second edition, I owe a debt of gratitude for the detailed review and feedback from Jeff Bean, Doug Cutting, Glynn Durham, Alan Gates, Jeff Hammerbacher, Alex Kozlov, Ken Krugler, Jimmy Lin, Todd Lipcon, Sarah Sproehnle, Vinithra Varadharajan, and Ian Wrigley, as well as all the readers who submitted errata for the first edition. I would also like to thank Aaron Kimball for contributing the chapter on Sqoop, and Philip (“flip”) Kromer for the case study on graph processing. I am particularly grateful to Doug Cutting for his encouragement, support, and friendship, and for contributing the foreword. Thanks also go to the many others with whom I have had conversations or email discussions over the course of writing the book. Halfway through writing this book, I joined Cloudera, and I want to thank my colleagues for being incredibly supportive in allowing me the time to write, and to get it finished promptly. Preface | xxi
Slide 24: I am grateful to my editor, Mike Loukides, and his colleagues at O’Reilly for their help in the preparation of this book. Mike has been there throughout to answer my questions, to read my first drafts, and to keep me on schedule. Finally, the writing of this book has been a great deal of work, and I couldn’t have done it without the constant support of my family. My wife, Eliane, not only kept the home going, but also stepped in to help review, edit, and chase case studies. My daughters, Emilia and Lottie, have been very understanding, and I’m looking forward to spending lots more time with all of them. xxii | Preface
Slide 25: CHAPTER 1 Meet Hadoop In pioneer days they used oxen for heavy pulling, and when one ox couldn’t budge a log, they didn’t try to grow a larger ox. We shouldn’t be trying for bigger computers, but for more systems of computers. —Grace Hopper Data! We live in the data age. It’s not easy to measure the total volume of data stored electronically, but an IDC estimate put the size of the “digital universe” at 0.18 zettabytes in 2006, and is forecasting a tenfold growth by 2011 to 1.8 zettabytes.* A zettabyte is 1021 bytes, or equivalently one thousand exabytes, one million petabytes, or one billion terabytes. That’s roughly the same order of magnitude as one disk drive for every person in the world. This flood of data is coming from many sources. Consider the following:† • The New York Stock Exchange generates about one terabyte of new trade data per day. • Facebook hosts approximately 10 billion photos, taking up one petabyte of storage. • Ancestry.com, the genealogy site, stores around 2.5 petabytes of data. • The Internet Archive stores around 2 petabytes of data, and is growing at a rate of 20 terabytes per month. • The Large Hadron Collider near Geneva, Switzerland, will produce about 15 petabytes of data per year. * From Gantz et al., “The Diverse and Exploding Digital Universe,” March 2008 (http://www.emc.com/ collateral/analyst-reports/diverse-exploding-digital-universe.pdf). † http://www.intelligententerprise.com/showArticle.jhtml?articleID=207800705, http://mashable.com/2008/10/ 15/facebook-10-billion-photos/, http://blog.familytreemagazine.com/insider/Inside+Ancestrycoms+TopSecret +Data+Center.aspx, and http://www.archive.org/about/faqs.php, http://www.interactions.org/cms/?pid= 1027032. 1
Slide 26: So there’s a lot of data out there. But you are probably wondering how it affects you. Most of the data is locked up in the largest web properties (like search engines), or scientific or financial institutions, isn’t it? Does the advent of “Big Data,” as it is being called, affect smaller organizations or individuals? I argue that it does. Take photos, for example. My wife’s grandfather was an avid photographer, and took photographs throughout his adult life. His entire corpus of medium format, slide, and 35mm film, when scanned in at high-resolution, occupies around 10 gigabytes. Compare this to the digital photos that my family took in 2008, which take up about 5 gigabytes of space. My family is producing photographic data at 35 times the rate my wife’s grandfather’s did, and the rate is increasing every year as it becomes easier to take more and more photos. More generally, the digital streams that individuals are producing are growing apace. Microsoft Research’s MyLifeBits project gives a glimpse of archiving of personal information that may become commonplace in the near future. MyLifeBits was an experiment where an individual’s interactions—phone calls, emails, documents—were captured electronically and stored for later access. The data gathered included a photo taken every minute, which resulted in an overall data volume of one gigabyte a month. When storage costs come down enough to make it feasible to store continuous audio and video, the data volume for a future MyLifeBits service will be many times that. The trend is for every individual’s data footprint to grow, but perhaps more important, the amount of data generated by machines will be even greater than that generated by people. Machine logs, RFID readers, sensor networks, vehicle GPS traces, retail transactions—all of these contribute to the growing mountain of data. The volume of data being made publicly available increases every year, too. Organizations no longer have to merely manage their own data: success in the future will be dictated to a large extent by their ability to extract value from other organizations’ data. Initiatives such as Public Data Sets on Amazon Web Services, Infochimps.org, and theinfo.org exist to foster the “information commons,” where data can be freely (or in the case of AWS, for a modest price) shared for anyone to download and analyze. Mashups between different information sources make for unexpected and hitherto unimaginable applications. Take, for example, the Astrometry.net project, which watches the Astrometry group on Flickr for new photos of the night sky. It analyzes each image and identifies which part of the sky it is from, as well as any interesting celestial bodies, such as stars or galaxies. This project shows the kind of things that are possible when data (in this case, tagged photographic images) is made available and used for something (image analysis) that was not anticipated by the creator. It has been said that “More data usually beats better algorithms,” which is to say that for some problems (such as recommending movies or music based on past preferences), 2 | Chapter 1: Meet Hadoop
Slide 27: however fiendish your algorithms are, they can often be beaten simply by having more data (and a less sophisticated algorithm).‡ The good news is that Big Data is here. The bad news is that we are struggling to store and analyze it. Data Storage and Analysis The problem is simple: while the storage capacities of hard drives have increased massively over the years, access speeds—the rate at which data can be read from drives— have not kept up. One typical drive from 1990 could store 1,370 MB of data and had a transfer speed of 4.4 MB/s,§ so you could read all the data from a full drive in around five minutes. Over 20 years later, one terabyte drives are the norm, but the transfer speed is around 100 MB/s, so it takes more than two and a half hours to read all the data off the disk. This is a long time to read all data on a single drive—and writing is even slower. The obvious way to reduce the time is to read from multiple disks at once. Imagine if we had 100 drives, each holding one hundredth of the data. Working in parallel, we could read the data in under two minutes. Only using one hundredth of a disk may seem wasteful. But we can store one hundred datasets, each of which is one terabyte, and provide shared access to them. We can imagine that the users of such a system would be happy to share access in return for shorter analysis times, and, statistically, that their analysis jobs would be likely to be spread over time, so they wouldn’t interfere with each other too much. There’s more to being able to read and write data in parallel to or from multiple disks, though. The first problem to solve is hardware failure: as soon as you start using many pieces of hardware, the chance that one will fail is fairly high. A common way of avoiding data loss is through replication: redundant copies of the data are kept by the system so that in the event of failure, there is another copy available. This is how RAID works, for instance, although Hadoop’s filesystem, the Hadoop Distributed Filesystem (HDFS), takes a slightly different approach, as you shall see later. The second problem is that most analysis tasks need to be able to combine the data in some way; data read from one disk may need to be combined with the data from any of the other 99 disks. Various distributed systems allow data to be combined from multiple sources, but doing this correctly is notoriously challenging. MapReduce provides a programming model that abstracts the problem from disk reads and writes, ‡ The quote is from Anand Rajaraman writing about the Netflix Challenge (http://anand.typepad.com/ datawocky/2008/03/more-data-usual.html). Alon Halevy, Peter Norvig, and Fernando Pereira make the same point in “The Unreasonable Effectiveness of Data,” IEEE Intelligent Systems, March/April 2009. § These specifications are for the Seagate ST-41600n. Data Storage and Analysis | 3
Slide 28: transforming it into a computation over sets of keys and values. We will look at the details of this model in later chapters, but the important point for the present discussion is that there are two parts to the computation, the map and the reduce, and it’s the interface between the two where the “mixing” occurs. Like HDFS, MapReduce has built-in reliability. This, in a nutshell, is what Hadoop provides: a reliable shared storage and analysis system. The storage is provided by HDFS and analysis by MapReduce. There are other parts to Hadoop, but these capabilities are its kernel. Comparison with Other Systems The approach taken by MapReduce may seem like a brute-force approach. The premise is that the entire dataset—or at least a good portion of it—is processed for each query. But this is its power. MapReduce is a batch query processor, and the ability to run an ad hoc query against your whole dataset and get the results in a reasonable time is transformative. It changes the way you think about data, and unlocks data that was previously archived on tape or disk. It gives people the opportunity to innovate with data. Questions that took too long to get answered before can now be answered, which in turn leads to new questions and new insights. For example, Mailtrust, Rackspace’s mail division, used Hadoop for processing email logs. One ad hoc query they wrote was to find the geographic distribution of their users. In their words: This data was so useful that we’ve scheduled the MapReduce job to run monthly and we will be using this data to help us decide which Rackspace data centers to place new mail servers in as we grow. By bringing several hundred gigabytes of data together and having the tools to analyze it, the Rackspace engineers were able to gain an understanding of the data that they otherwise would never have had, and, furthermore, they were able to use what they had learned to improve the service for their customers. You can read more about how Rackspace uses Hadoop in Chapter 16. RDBMS Why can’t we use databases with lots of disks to do large-scale batch analysis? Why is MapReduce needed? 4 | Chapter 1: Meet Hadoop
Slide 29: The answer to these questions comes from another trend in disk drives: seek time is improving more slowly than transfer rate. Seeking is the process of moving the disk’s head to a particular place on the disk to read or write data. It characterizes the latency of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth. If the data access pattern is dominated by seeks, it will take longer to read or write large portions of the dataset than streaming through it, which operates at the transfer rate. On the other hand, for updating a small proportion of records in a database, a traditional B-Tree (the data structure used in relational databases, which is limited by the rate it can perform seeks) works well. For updating the majority of a database, a B-Tree is less efficient than MapReduce, which uses Sort/Merge to rebuild the database. In many ways, MapReduce can be seen as a complement to an RDBMS. (The differences between the two systems are shown in Table 1-1.) MapReduce is a good fit for problems that need to analyze the whole dataset, in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data. MapReduce suits applications where the data is written once, and read many times, whereas a relational database is good for datasets that are continually updated. Table 1-1. RDBMS compared to MapReduce Traditional RDBMS Data size Access Updates Structure Integrity Scaling Gigabytes Interactive and batch Read and write many times Static schema High Nonlinear MapReduce Petabytes Batch Write once, read many times Dynamic schema Low Linear Another difference between MapReduce and an RDBMS is the amount of structure in the datasets that they operate on. Structured data is data that is organized into entities that have a defined format, such as XML documents or database tables that conform to a particular predefined schema. This is the realm of the RDBMS. Semi-structured data, on the other hand, is looser, and though there may be a schema, it is often ignored, so it may be used only as a guide to the structure of the data: for example, a spreadsheet, in which the structure is the grid of cells, although the cells themselves may hold any form of data. Unstructured data does not have any particular internal structure: for example, plain text or image data. MapReduce works well on unstructured or semistructured data, since it is designed to interpret the data at processing time. In other words, the input keys and values for MapReduce are not an intrinsic property of the data, but they are chosen by the person analyzing the data. Comparison with Other Systems | 5
Slide 30: Relational data is often normalized to retain its integrity and remove redundancy. Normalization poses problems for MapReduce, since it makes reading a record a nonlocal operation, and one of the central assumptions that MapReduce makes is that it is possible to perform (high-speed) streaming reads and writes. A web server log is a good example of a set of records that is not normalized (for example, the client hostnames are specified in full each time, even though the same client may appear many times), and this is one reason that logfiles of all kinds are particularly well-suited to analysis with MapReduce. MapReduce is a linearly scalable programming model. The programmer writes two functions—a map function and a reduce function—each of which defines a mapping from one set of key-value pairs to another. These functions are oblivious to the size of the data or the cluster that they are operating on, so they can be used unchanged for a small dataset and for a massive one. More important, if you double the size of the input data, a job will run twice as slow. But if you also double the size of the cluster, a job will run as fast as the original one. This is not generally true of SQL queries. Over time, however, the differences between relational databases and MapReduce systems are likely to blur—both as relational databases start incorporating some of the ideas from MapReduce (such as Aster Data’s and Greenplum’s databases) and, from the other direction, as higher-level query languages built on MapReduce (such as Pig and Hive) make MapReduce systems more approachable to traditional database programmers.‖ Grid Computing The High Performance Computing (HPC) and Grid Computing communities have been doing large-scale data processing for years, using such APIs as Message Passing Interface (MPI). Broadly, the approach in HPC is to distribute the work across a cluster of machines, which access a shared filesystem, hosted by a SAN. This works well for predominantly compute-intensive jobs, but becomes a problem when nodes need to access larger data volumes (hundreds of gigabytes, the point at which MapReduce really starts to shine), since the network bandwidth is the bottleneck and compute nodes become idle. ‖ In January 2007, David J. DeWitt and Michael Stonebraker caused a stir by publishing “MapReduce: A major step backwards” (http://databasecolumn.vertica.com/database-innovation/mapreduce-a-major-step -backwards), in which they criticized MapReduce for being a poor substitute for relational databases. Many commentators argued that it was a false comparison (see, for example, Mark C. Chu-Carroll’s “Databases are hammers; MapReduce is a screwdriver,” http://scienceblogs.com/goodmath/2008/01/databases_are _hammers_mapreduc.php), and DeWitt and Stonebraker followed up with “MapReduce II” (http:// databasecolumn.vertica.com/database-innovation/mapreduce-ii), where they addressed the main topics brought up by others. 6 | Chapter 1: Meet Hadoop
Slide 31: MapReduce tries to collocate the data with the compute node, so data access is fast since it is local.# This feature, known as data locality, is at the heart of MapReduce and is the reason for its good performance. Recognizing that network bandwidth is the most precious resource in a data center environment (it is easy to saturate network links by copying data around), MapReduce implementations go to great lengths to conserve it by explicitly modelling network topology. Notice that this arrangement does not preclude high-CPU analyses in MapReduce. MPI gives great control to the programmer, but requires that he or she explicitly handle the mechanics of the data flow, exposed via low-level C routines and constructs, such as sockets, as well as the higher-level algorithm for the analysis. MapReduce operates only at the higher level: the programmer thinks in terms of functions of key and value pairs, and the data flow is implicit. Coordinating the processes in a large-scale distributed computation is a challenge. The hardest aspect is gracefully handling partial failure—when you don’t know if a remote process has failed or not—and still making progress with the overall computation. MapReduce spares the programmer from having to think about failure, since the implementation detects failed map or reduce tasks and reschedules replacements on machines that are healthy. MapReduce is able to do this since it is a shared-nothing architecture, meaning that tasks have no dependence on one other. (This is a slight oversimplification, since the output from mappers is fed to the reducers, but this is under the control of the MapReduce system; in this case, it needs to take more care rerunning a failed reducer than rerunning a failed map, since it has to make sure it can retrieve the necessary map outputs, and if not, regenerate them by running the relevant maps again.) So from the programmer’s point of view, the order in which the tasks run doesn’t matter. By contrast, MPI programs have to explicitly manage their own checkpointing and recovery, which gives more control to the programmer, but makes them more difficult to write. MapReduce might sound like quite a restrictive programming model, and in a sense it is: you are limited to key and value types that are related in specified ways, and mappers and reducers run with very limited coordination between one another (the mappers pass keys and values to reducers). A natural question to ask is: can you do anything useful or nontrivial with it? The answer is yes. MapReduce was invented by engineers at Google as a system for building production search indexes because they found themselves solving the same problem over and over again (and MapReduce was inspired by older ideas from the functional programming, distributed computing, and database communities), but it has since been used for many other applications in many other industries. It is pleasantly surprising to see the range of algorithms that can be expressed in MapReduce, from #Jim Gray was an early advocate of putting the computation near the data. See “Distributed Computing Economics,” March 2003, http://research.microsoft.com/apps/pubs/default.aspx?id=70001. Comparison with Other Systems | 7
Slide 32: image analysis, to graph-based problems, to machine learning algorithms.* It can’t solve every problem, of course, but it is a general data-processing tool. You can see a sample of some of the applications that Hadoop has been used for in Chapter 16. Volunteer Computing When people first hear about Hadoop and MapReduce, they often ask, “How is it different from SETI@home?” SETI, the Search for Extra-Terrestrial Intelligence, runs a project called SETI@home in which volunteers donate CPU time from their otherwise idle computers to analyze radio telescope data for signs of intelligent life outside earth. SETI@home is the most well-known of many volunteer computing projects; others include the Great Internet Mersenne Prime Search (to search for large prime numbers) and Folding@home (to understand protein folding and how it relates to disease). Volunteer computing projects work by breaking the problem they are trying to solve into chunks called work units, which are sent to computers around the world to be analyzed. For example, a SETI@home work unit is about 0.35 MB of radio telescope data, and takes hours or days to analyze on a typical home computer. When the analysis is completed, the results are sent back to the server, and the client gets another work unit. As a precaution to combat cheating, each work unit is sent to three different machines and needs at least two results to agree to be accepted. Although SETI@home may be superficially similar to MapReduce (breaking a problem into independent pieces to be worked on in parallel), there are some significant differences. The SETI@home problem is very CPU-intensive, which makes it suitable for running on hundreds of thousands of computers across the world,† since the time to transfer the work unit is dwarfed by the time to run the computation on it. Volunteers are donating CPU cycles, not bandwidth. MapReduce is designed to run jobs that last minutes or hours on trusted, dedicated hardware running in a single data center with very high aggregate bandwidth interconnects. By contrast, SETI@home runs a perpetual computation on untrusted machines on the Internet with highly variable connection speeds and no data locality. * Apache Mahout (http://mahout.apache.org/) is a project to build machine learning libraries (such as classification and clustering algorithms) that run on Hadoop. † In January 2008, SETI@home was reported at http://www.planetary.org/programs/projects/setiathome/ setiathome_20080115.html to be processing 300 gigabytes a day, using 320,000 computers (most of which are not dedicated to SETI@home; they are used for other things, too). 8 | Chapter 1: Meet Hadoop
Slide 33: A Brief History of Hadoop Hadoop was created by Doug Cutting, the creator of Apache Lucene, the widely used text search library. Hadoop has its origins in Apache Nutch, an open source web search engine, itself a part of the Lucene project. The Origin of the Name “Hadoop” The name Hadoop is not an acronym; it’s a made-up name. The project’s creator, Doug Cutting, explains how the name came about: The name my kid gave a stuffed yellow elephant. Short, relatively easy to spell and pronounce, meaningless, and not used elsewhere: those are my naming criteria. Kids are good at generating such. Googol is a kid’s term. Subprojects and “contrib” modules in Hadoop also tend to have names that are unrelated to their function, often with an elephant or other animal theme (“Pig,” for example). Smaller components are given more descriptive (and therefore more mundane) names. This is a good principle, as it means you can generally work out what something does from its name. For example, the jobtracker‡ keeps track of MapReduce jobs. Building a web search engine from scratch was an ambitious goal, for not only is the software required to crawl and index websites complex to write, but it is also a challenge to run without a dedicated operations team, since there are so many moving parts. It’s expensive, too: Mike Cafarella and Doug Cutting estimated a system supporting a 1-billion-page index would cost around half a million dollars in hardware, with a monthly running cost of $30,000.§ Nevertheless, they believed it was a worthy goal, as it would open up and ultimately democratize search engine algorithms. Nutch was started in 2002, and a working crawler and search system quickly emerged. However, they realized that their architecture wouldn’t scale to the billions of pages on the Web. Help was at hand with the publication of a paper in 2003 that described the architecture of Google’s distributed filesystem, called GFS, which was being used in production at Google.‖ GFS, or something like it, would solve their storage needs for the very large files generated as a part of the web crawl and indexing process. In particular, GFS would free up time being spent on administrative tasks such as managing storage nodes. In 2004, they set about writing an open source implementation, the Nutch Distributed Filesystem (NDFS). ‡ In this book, we use the lowercase form, “jobtracker,” to denote the entity when it’s being referred to generally, and the CamelCase form JobTracker to denote the Java class that implements it. § Mike Cafarella and Doug Cutting, “Building Nutch: Open Source Search,” ACM Queue, April 2004, http:// queue.acm.org/detail.cfm?id=988408. ‖ Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung, “The Google File System,” October 2003, http: //labs.google.com/papers/gfs.html. A Brief History of Hadoop | 9
Slide 34: In 2004, Google published the paper that introduced MapReduce to the world.# Early in 2005, the Nutch developers had a working MapReduce implementation in Nutch, and by the middle of that year all the major Nutch algorithms had been ported to run using MapReduce and NDFS. NDFS and the MapReduce implementation in Nutch were applicable beyond the realm of search, and in February 2006 they moved out of Nutch to form an independent subproject of Lucene called Hadoop. At around the same time, Doug Cutting joined Yahoo!, which provided a dedicated team and the resources to turn Hadoop into a system that ran at web scale (see sidebar). This was demonstrated in February 2008 when Yahoo! announced that its production search index was being generated by a 10,000-core Hadoop cluster.* In January 2008, Hadoop was made its own top-level project at Apache, confirming its success and its diverse, active community. By this time, Hadoop was being used by many other companies besides Yahoo!, such as Last.fm, Facebook, and the New York Times. Some applications are covered in the case studies in Chapter 16 and on the Hadoop wiki. In one well-publicized feat, the New York Times used Amazon’s EC2 compute cloud to crunch through four terabytes of scanned archives from the paper converting them to PDFs for the Web.† The processing took less than 24 hours to run using 100 machines, and the project probably wouldn’t have been embarked on without the combination of Amazon’s pay-by-the-hour model (which allowed the NYT to access a large number of machines for a short period) and Hadoop’s easy-to-use parallel programming model. In April 2008, Hadoop broke a world record to become the fastest system to sort a terabyte of data. Running on a 910-node cluster, Hadoop sorted one terabyte in 209 seconds (just under 3½ minutes), beating the previous year’s winner of 297 seconds (described in detail in “TeraByte Sort on Apache Hadoop” on page 553). In November of the same year, Google reported that its MapReduce implementation sorted one terabyte in 68 seconds.‡ As the first edition of this book was going to press (May 2009), it was announced that a team at Yahoo! used Hadoop to sort one terabyte in 62 seconds. #Jeffrey Dean and Sanjay Ghemawat, “MapReduce: Simplified Data Processing on Large Clusters ,” December 2004, http://labs.google.com/papers/mapreduce.html. * “Yahoo! Launches World’s Largest Hadoop Production Application,” 19 February 2008, http://developer .yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-production-hadoop.html. † Derek Gottfrid, “Self-service, Prorated Super Computing Fun!” 1 November 2007, http://open.blogs.nytimes .com/2007/11/01/self-service-prorated-super-computing-fun/. ‡ “Sorting 1PB with MapReduce,” 21 November 2008, http://googleblog.blogspot.com/2008/11/sorting-1pb -with-mapreduce.html. 10 | Chapter 1: Meet Hadoop
Slide 35: Hadoop at Yahoo! Building Internet-scale search engines requires huge amounts of data and therefore large numbers of machines to process it. Yahoo! Search consists of four primary components: the Crawler, which downloads pages from web servers; the WebMap, which builds a graph of the known Web; the Indexer, which builds a reverse index to the best pages; and the Runtime, which answers users’ queries. The WebMap is a graph that consists of roughly 1 trillion (1012) edges each representing a web link and 100 billion (1011) nodes each representing distinct URLs. Creating and analyzing such a large graph requires a large number of computers running for many days. In early 2005, the infrastructure for the WebMap, named Dreadnaught, needed to be redesigned to scale up to more nodes. Dreadnaught had successfully scaled from 20 to 600 nodes, but required a complete redesign to scale out further. Dreadnaught is similar to MapReduce in many ways, but provides more flexibility and less structure. In particular, each fragment in a Dreadnaught job can send output to each of the fragments in the next stage of the job, but the sort was all done in library code. In practice, most of the WebMap phases were pairs that corresponded to MapReduce. Therefore, the WebMap applications would not require extensive refactoring to fit into MapReduce. Eric Baldeschwieler (Eric14) created a small team and we started designing and prototyping a new framework written in C++ modeled after GFS and MapReduce to replace Dreadnaught. Although the immediate need was for a new framework for WebMap, it was clear that standardization of the batch platform across Yahoo! Search was critical and by making the framework general enough to support other users, we could better leverage investment in the new platform. At the same time, we were watching Hadoop, which was part of Nutch, and its progress. In January 2006, Yahoo! hired Doug Cutting, and a month later we decided to abandon our prototype and adopt Hadoop. The advantage of Hadoop over our prototype and design was that it was already working with a real application (Nutch) on 20 nodes. That allowed us to bring up a research cluster two months later and start helping real customers use the new framework much sooner than we could have otherwise. Another advantage, of course, was that since Hadoop was already open source, it was easier (although far from easy!) to get permission from Yahoo!’s legal department to work in open source. So we set up a 200-node cluster for the researchers in early 2006 and put the WebMap conversion plans on hold while we supported and improved Hadoop for the research users. Here’s a quick timeline of how things have progressed: • 2004—Initial versions of what is now Hadoop Distributed Filesystem and MapReduce implemented by Doug Cutting and Mike Cafarella. • December 2005—Nutch ported to the new framework. Hadoop runs reliably on 20 nodes. • January 2006—Doug Cutting joins Yahoo!. • February 2006—Apache Hadoop project officially started to support the standalone development of MapReduce and HDFS. A Brief History of Hadoop | 11
Slide 36: • February 2006—Adoption of Hadoop by Yahoo! Grid team. • April 2006—Sort benchmark (10 GB/node) run on 188 nodes in 47.9 hours. • May 2006—Yahoo! set up a Hadoop research cluster—300 nodes. • May 2006—Sort benchmark run on 500 nodes in 42 hours (better hardware than April benchmark). • October 2006—Research cluster reaches 600 nodes. • December 2006—Sort benchmark run on 20 nodes in 1.8 hours, 100 nodes in 3.3 hours, 500 nodes in 5.2 hours, 900 nodes in 7.8 hours. • January 2007—Research cluster reaches 900 nodes. • April 2007—Research clusters—2 clusters of 1000 nodes. • April 2008—Won the 1 terabyte sort benchmark in 209 seconds on 900 nodes. • October 2008—Loading 10 terabytes of data per day on to research clusters. • March 2009—17 clusters with a total of 24,000 nodes. • April 2009—Won the minute sort by sorting 500 GB in 59 seconds (on 1,400 nodes) and the 100 terabyte sort in 173 minutes (on 3,400 nodes). —Owen O’Malley Apache Hadoop and the Hadoop Ecosystem Although Hadoop is best known for MapReduce and its distributed filesystem (HDFS, renamed from NDFS), the term is also used for a family of related projects that fall under the umbrella of infrastructure for distributed computing and large-scale data processing. Most of the core projects covered in this book are hosted by the Apache Software Foundation, which provides support for a community of open source software projects, including the original HTTP Server from which it gets its name. As the Hadoop ecosystem grows, more projects are appearing, not necessarily hosted at Apache, which provide complementary services to Hadoop, or build on the core to add higher-level abstractions. The Hadoop projects that are covered in this book are described briefly here: Common A set of components and interfaces for distributed filesystems and general I/O (serialization, Java RPC, persistent data structures). Avro A serialization system for efficient, cross-language RPC, and persistent data storage. MapReduce A distributed data processing model and execution environment that runs on large clusters of commodity machines. 12 | Chapter 1: Meet Hadoop
Slide 37: HDFS A distributed filesystem that runs on large clusters of commodity machines. Pig A data flow language and execution environment for exploring very large datasets. Pig runs on HDFS and MapReduce clusters. Hive A distributed data warehouse. Hive manages data stored in HDFS and provides a query language based on SQL (and which is translated by the runtime engine to MapReduce jobs) for querying the data. HBase A distributed, column-oriented database. HBase uses HDFS for its underlying storage, and supports both batch-style computations using MapReduce and point queries (random reads). ZooKeeper A distributed, highly available coordination service. ZooKeeper provides primitives such as distributed locks that can be used for building distributed applications. Sqoop A tool for efficiently moving data between relational databases and HDFS. Apache Hadoop and the Hadoop Ecosystem | 13
Slide 39: CHAPTER 2 MapReduce MapReduce is a programming model for data processing. The model is simple, yet not too simple to express useful programs in. Hadoop can run MapReduce programs written in various languages; in this chapter, we shall look at the same program expressed in Java, Ruby, Python, and C++. Most important, MapReduce programs are inherently parallel, thus putting very large-scale data analysis into the hands of anyone with enough machines at their disposal. MapReduce comes into its own for large datasets, so let’s start by looking at one. A Weather Dataset For our example, we will write a program that mines weather data. Weather sensors collecting data every hour at many locations across the globe gather a large volume of log data, which is a good candidate for analysis with MapReduce, since it is semistructured and record-oriented. Data Format The data we will use is from the National Climatic Data Center (NCDC, http://www .ncdc.noaa.gov/). The data is stored using a line-oriented ASCII format, in which each line is a record. The format supports a rich set of meteorological elements, many of which are optional or with variable data lengths. For simplicity, we shall focus on the basic elements, such as temperature, which are always present and are of fixed width. Example 2-1 shows a sample line with some of the salient fields highlighted. The line has been split into multiple lines to show each field: in the real file, fields are packed into one line with no delimiters. 15
Slide 40: Example 2-1. Format of a National Climate Data Center record 0057 332130 99999 19500101 0300 4 +51317 +028783 FM-12 +0171 99999 V020 320 1 N 0072 1 00450 1 C N 010000 1 N 9 -0128 1 -0139 1 10268 1 # # # # USAF weather station identifier WBAN weather station identifier observation date observation time # latitude (degrees x 1000) # longitude (degrees x 1000) # elevation (meters) # wind direction (degrees) # quality code # sky ceiling height (meters) # quality code # visibility distance (meters) # quality code # # # # # # air temperature (degrees Celsius x 10) quality code dew point temperature (degrees Celsius x 10) quality code atmospheric pressure (hectopascals x 10) quality code Data files are organized by date and weather station. There is a directory for each year from 1901 to 2001, each containing a gzipped file for each weather station with its readings for that year. For example, here are the first entries for 1990: % ls raw/1990 | head 010010-99999-1990.gz 010014-99999-1990.gz 010015-99999-1990.gz 010016-99999-1990.gz 010017-99999-1990.gz 010030-99999-1990.gz 010040-99999-1990.gz 010080-99999-1990.gz 010100-99999-1990.gz 010150-99999-1990.gz Since there are tens of thousands of weather stations, the whole dataset is made up of a large number of relatively small files. It’s generally easier and more efficient to process a smaller number of relatively large files, so the data was preprocessed so that each 16 | Chapter 2: MapReduce
Slide 41: year’s readings were concatenated into a single file. (The means by which this was carried out is described in Appendix C.) Analyzing the Data with Unix Tools What’s the highest recorded global temperature for each year in the dataset? We will answer this first without using Hadoop, as this information will provide a performance baseline, as well as a useful means to check our results. The classic tool for processing line-oriented data is awk. Example 2-2 is a small script to calculate the maximum temperature for each year. Example 2-2. A program for finding the maximum recorded temperature by year from NCDC weather records #!/usr/bin/env bash for year in all/* do echo -ne `basename $year .gz`"\t" gunzip -c $year | \ awk '{ temp = substr($0, 88, 5) + 0; q = substr($0, 93, 1); if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp } END { print max }' done The script loops through the compressed year files, first printing the year, and then processing each file using awk. The awk script extracts two fields from the data: the air temperature and the quality code. The air temperature value is turned into an integer by adding 0. Next, a test is applied to see if the temperature is valid (the value 9999 signifies a missing value in the NCDC dataset) and if the quality code indicates that the reading is not suspect or erroneous. If the reading is OK, the value is compared with the maximum value seen so far, which is updated if a new maximum is found. The END block is executed after all the lines in the file have been processed, and it prints the maximum value. Here is the beginning of a run: % ./max_temperature.sh 1901 317 1902 244 1903 289 1904 256 1905 283 ... The temperature values in the source file are scaled by a factor of 10, so this works out as a maximum temperature of 31.7°C for 1901 (there were very few readings at the beginning of the century, so this is plausible). The complete run for the century took 42 minutes in one run on a single EC2 High-CPU Extra Large Instance. Analyzing the Data with Unix Tools | 17
Slide 42: To speed up the processing, we need to run parts of the program in parallel. In theory, this is straightforward: we could process different years in different processes, using all the available hardware threads on a machine. There are a few problems with this, however. First, dividing the work into equal-size pieces isn’t always easy or obvious. In this case, the file size for different years varies widely, so some processes will finish much earlier than others. Even if they pick up further work, the whole run is dominated by the longest file. A better approach, although one that requires more work, is to split the input into fixed-size chunks and assign each chunk to a process. Second, combining the results from independent processes may need further processing. In this case, the result for each year is independent of other years and may be combined by concatenating all the results, and sorting by year. If using the fixed-size chunk approach, the combination is more delicate. For this example, data for a particular year will typically be split into several chunks, each processed independently. We’ll end up with the maximum temperature for each chunk, so the final step is to look for the highest of these maximums, for each year. Third, you are still limited by the processing capacity of a single machine. If the best time you can achieve is 20 minutes with the number of processors you have, then that’s it. You can’t make it go faster. Also, some datasets grow beyond the capacity of a single machine. When we start using multiple machines, a whole host of other factors come into play, mainly falling in the category of coordination and reliability. Who runs the overall job? How do we deal with failed processes? So, though it’s feasible to parallelize the processing, in practice it’s messy. Using a framework like Hadoop to take care of these issues is a great help. Analyzing the Data with Hadoop To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines. Map and Reduce MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function. The input to our map phase is the raw NCDC data. We choose a text input format that gives us each line in the dataset as a text value. The key is the offset of the beginning of the line from the beginning of the file, but as we have no need for this, we ignore it. 18 | Chapter 2: MapReduce
Slide 43: Our map function is simple. We pull out the year and the air temperature, since these are the only fields we are interested in. In this case, the map function is just a data preparation phase, setting up the data in such a way that the reducer function can do its work on it: finding the maximum temperature for each year. The map function is also a good place to drop bad records: here we filter out temperatures that are missing, suspect, or erroneous. To visualize the way the map works, consider the following sample lines of input data (some unused columns have been dropped to fit the page, indicated by ellipses): 0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999... These lines are presented to the map function as the key-value pairs: (0, 0067011990999991950051507004...9999999N9+00001+99999999999...) (106, 0043011990999991950051512004...9999999N9+00221+99999999999...) (212, 0043011990999991950051518004...9999999N9-00111+99999999999...) (318, 0043012650999991949032412004...0500001N9+01111+99999999999...) (424, 0043012650999991949032418004...0500001N9+00781+99999999999...) The keys are the line offsets within the file, which we ignore in our map function. The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output (the temperature values have been interpreted as integers): (1950, (1950, (1950, (1949, (1949, 0) 22) −11) 111) 78) The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key. So, continuing the example, our reduce function sees the following input: (1949, [111, 78]) (1950, [0, 22, −11]) Each year appears with a list of all its air temperature readings. All the reduce function has to do now is iterate through the list and pick up the maximum reading: (1949, 111) (1950, 22) This is the final output: the maximum global temperature recorded in each year. The whole data flow is illustrated in Figure 2-1. At the bottom of the diagram is a Unix pipeline, which mimics the whole MapReduce flow, and which we will see again later in the chapter when we look at Hadoop Streaming. Analyzing the Data with Hadoop | 19
Slide 44: Figure 2-1. MapReduce logical data flow Java MapReduce Having run through how the MapReduce program works, the next step is to express it in code. We need three things: a map function, a reduce function, and some code to run the job. The map function is represented by an implementation of the Mapper interface, which declares a map() method. Example 2-3 shows the implementation of our map function. Example 2-3. Mapper for maximum temperature example import java.io.IOException; import import import import import import import org.apache.hadoop.io.IntWritable; org.apache.hadoop.io.LongWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapred.MapReduceBase; org.apache.hadoop.mapred.Mapper; org.apache.hadoop.mapred.OutputCollector; org.apache.hadoop.mapred.Reporter; public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } } 20 | Chapter 2: MapReduce
Slide 45: The Mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer). The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in. The map() method also provides an instance of OutputCollector to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable. We write an output record only if the temperature is present and the quality code indicates the temperature reading is OK. The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4. Example 2-4. Reducer for maximum temperature example import java.io.IOException; import java.util.Iterator; import import import import import import org.apache.hadoop.io.IntWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapred.MapReduceBase; org.apache.hadoop.mapred.OutputCollector; org.apache.hadoop.mapred.Reducer; org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximum Analyzing the Data with Hadoop | 21
Slide 46: temperature, which we find by iterating through the temperatures and comparing each with a record of the highest found so far. The third piece of code runs the MapReduce job (see Example 2-5). Example 2-5. Application to find the maximum temperature in the weather dataset import java.io.IOException; import import import import import import import org.apache.hadoop.fs.Path; org.apache.hadoop.io.IntWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapred.FileInputFormat; org.apache.hadoop.mapred.FileOutputFormat; org.apache.hadoop.mapred.JobClient; org.apache.hadoop.mapred.JobConf; public class MaxTemperature { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); } JobClient.runJob(conf); } A JobConf object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class. Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths. 22 | Chapter 2: MapReduce
Slide 47: The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another). Next, we specify the map and reduce types to use via the setMapperClass() and setReducerClass() methods. The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass(). The input types are controlled via the input format, which we have not explicitly set since we are using the default TextInputFormat. After setting the classes that define the map and reduce functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it to finish, writing information about its progress to the console. A test run After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any immediate problems with the code. First install Hadoop in standalone mode— there are instructions for how to do this in Appendix A. This is the mode in which Hadoop runs using the local filesystem with a local job runner. Let’s test it on the fiveline sample discussed earlier (the output has been slightly reformatted to fit the page): % export HADOOP_CLASSPATH=build/classes % hadoop MaxTemperature input/ncdc/sample.txt output 09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job Tracker, sessionId= 09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1 09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001 09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1 09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1 09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100 09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720 09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680 09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output 09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0 09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 09/04/07 12:34:36 INFO mapred.LocalJobRunner: file:/Users/tom/workspace/htdg/input/n cdc/sample.txt:0+529 09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. Analyzing the Data with Hadoop | 23
Slide 48: 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments 09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 57 bytes 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output 09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce 09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 09/04/07 12:34:36 INFO mapred.JobClient: map 100% reduce 100% 09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001 09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13 09/04/07 12:34:36 INFO mapred.JobClient: FileSystemCounters 09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_READ=27571 09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53907 09/04/07 12:34:36 INFO mapred.JobClient: Map-Reduce Framework 09/04/07 12:34:36 INFO mapred.JobClient: Reduce input groups=2 09/04/07 12:34:36 INFO mapred.JobClient: Combine output records=0 09/04/07 12:34:36 INFO mapred.JobClient: Map input records=5 09/04/07 12:34:36 INFO mapred.JobClient: Reduce shuffle bytes=0 09/04/07 12:34:36 INFO mapred.JobClient: Reduce output records=2 09/04/07 12:34:36 INFO mapred.JobClient: Spilled Records=10 09/04/07 12:34:36 INFO mapred.JobClient: Map output bytes=45 09/04/07 12:34:36 INFO mapred.JobClient: Map input bytes=529 09/04/07 12:34:36 INFO mapred.JobClient: Combine input records=0 09/04/07 12:34:36 INFO mapred.JobClient: Map output records=5 09/04/07 12:34:36 INFO mapred.JobClient: Reduce input records=5 When the hadoop command is invoked with a classname as the first argument, it launches a JVM to run the class. It is more convenient to use hadoop than straight java since the former adds the Hadoop libraries (and their dependencies) to the classpath and picks up the Hadoop configuration, too. To add the application classes to the classpath, we’ve defined an environment variable called HADOOP_CLASSPATH, which the hadoop script picks up. When running in local (standalone) mode, the programs in this book all assume that you have set the HADOOP_CLASSPATH in this way. The commands should be run from the directory that the example code is installed in. The output from running the job provides some useful information. (The warning about the job JAR file not being found is expected, since we are running in local mode without a JAR. We won’t see this warning when we run on a cluster.) For example, we can see that the job was given an ID of job_local_0001, and it ran one map task and one reduce task (with the IDs attempt_local_0001_m_000000_0 and 24 | Chapter 2: MapReduce
Slide 49: attempt_local_0001_r_000000_0). Knowing the job and task IDs can be very useful when debugging MapReduce jobs. The last section of the output, titled “Counters,” shows the statistics that Hadoop generates for each job it runs. These are very useful for checking whether the amount of data processed is what you expected. For example, we can follow the number of records that went through the system: five map inputs produced five map outputs, then five reduce inputs in two groups produced two reduce outputs. The output was written to the output directory, which contains one output file per reducer. The job had a single reducer, so we find a single file, named part-00000: % cat output/part-00000 1949 111 1950 22 This result is the same as when we went through it by hand earlier. We interpret this as saying that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 it was 2.2°C. The new Java MapReduce API Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewritten to take advantage of it.* There are several notable differences between the two APIs: • The new API favors abstract classes over interfaces, since these are easier to evolve. For example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. In the new API, the Mapper and Reducer interfaces are now abstract classes. • The new API is in the org.apache.hadoop.mapreduce package (and subpackages). The old API can still be found in org.apache.hadoop.mapred. • The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The MapContext, for example, essentially unifies the role of the JobConf, the OutputCollector, and the Reporter. • The new API supports both a “push” and a “pull” style of iteration. In both APIs, key-value record pairs are pushed to the mapper, but in addition, the new API allows a mapper to pull records from within the map() method. The same goes for the reducer. An example of how the “pull” style can be useful is processing records in batches, rather than one by one. * The new API is not complete (or stable) in the 0.20 release series (the latest available at the time of writing). This book uses the old API for this reason. However, a copy of all of the examples in this book, rewritten to use the new API (for releases 0.21.0 and later), will be made available on the book’s website. Analyzing the Data with Hadoop | 25
Slide 50: • Configuration has been unified. The old API has a special JobConf object for job configuration, which is an extension of Hadoop’s vanilla Configuration object (used for configuring daemons; see “The Configuration API” on page 130). In the new API, this distinction is dropped, so job configuration is done through a Configuration. • Job control is performed through the Job class, rather than JobClient, which no longer exists in the new API. • Output files are named slightly differently: part-m-nnnnn for map outputs, and partr-nnnnn for reduce outputs (where nnnnn is an integer designating the part number, starting from zero). Example 2-6 shows the MaxTemperature application rewritten to use the new API. The differences are highlighted in bold. When converting your Mapper and Reducer classes to the new API, don’t forget to change the signature of the map() and reduce() methods to the new form. Just changing your class to extend the new Mapper or Reducer classes will not produce a compilation error or warning, since these classes provide an identity form of the map() or reduce() method (respectively). Your mapper or reducer code, however, will not be invoked, which can lead to some hard-to-diagnose errors. Example 2-6. Application to find the maximum temperature in the weather dataset using the new context objects MapReduce API public class NewMaxTemperature { static class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } 26 | Chapter 2: MapReduce
Slide 51: static class NewMaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: NewMaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); } System.exit(job.waitForCompletion(true) ? 0 : 1); } Scaling Out You’ve seen how MapReduce works for small inputs; now it’s time to take a bird’s-eye view of the system and look at the data flow for large inputs. For simplicity, the examples so far have used files on the local filesystem. However, to scale out, we need to store the data in a distributed filesystem, typically HDFS (which you’ll learn about in the next chapter), to allow Hadoop to move the MapReduce computation to each machine hosting a part of the data. Let’s see how this works. Scaling Out | 27
Slide 52: Data Flow First, some terminology. A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information. Hadoop runs the job by dividing it into tasks, of which there are two types: map tasks and reduce tasks. There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers. The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a task fails, the jobtracker can reschedule it on a different tasktracker. Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits. Hadoop creates one map task for each split, which runs the userdefined map function for each record in the split. Having many splits means the time taken to process each split is small compared to the time to process the whole input. So if we are processing the splits in parallel, the processing is better load-balanced if the splits are small, since a faster machine will be able to process proportionally more splits over the course of the job than a slower machine. Even if the machines are identical, failed processes or other jobs running concurrently make load balancing desirable, and the quality of the load balancing increases as the splits become more fine-grained. On the other hand, if splits are too small, then the overhead of managing the splits and of map task creation begins to dominate the total job execution time. For most jobs, a good split size tends to be the size of an HDFS block, 64 MB by default, although this can be changed for the cluster (for all newly created files), or specified when each file is created. Hadoop does its best to run the map task on a node where the input data resides in HDFS. This is called the data locality optimization. It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node. If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data. Map tasks write their output to the local disk, not to HDFS. Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. So storing it in HDFS, with replication, would be overkill. If the node running the map task fails before the map output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to re-create the map output. 28 | Chapter 2: MapReduce
Slide 53: Reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. In the present example, we have a single reduce task that is fed by all of the map tasks. Therefore, the sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. The output of the reduce is normally stored in HDFS for reliability. As explained in Chapter 3, for each HDFS block of the reduce output, the first replica is stored on the local node, with other replicas being stored on off-rack nodes. Thus, writing the reduce output does consume network bandwidth, but only as much as a normal HDFS write pipeline consumes. The whole data flow with a single reduce task is illustrated in Figure 2-2. The dotted boxes indicate nodes, the light arrows show data transfers on a node, and the heavy arrows show data transfers between nodes. Figure 2-2. MapReduce data flow with a single reduce task The number of reduce tasks is not governed by the size of the input, but is specified independently. In “The Default MapReduce Job” on page 191, you will see how to choose the number of reduce tasks for a given job. When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well. Scaling Out | 29
Slide 54: The data flow for the general case of multiple reduce tasks is illustrated in Figure 2-3. This diagram makes it clear why the data flow between map and reduce tasks is colloquially known as “the shuffle,” as each reduce task is fed by many map tasks. The shuffle is more complicated than this diagram suggests, and tuning it can have a big impact on job execution time, as you will see in “Shuffle and Sort” on page 177. Figure 2-3. MapReduce data flow with multiple reduce tasks Finally, it’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle since the processing can be carried out entirely in parallel (a few examples are discussed in “NLineInputFormat” on page 211). In this case, the only off-node data transfer is when the map tasks write to HDFS (see Figure 2-4). Combiner Functions Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer. 30 | Chapter 2: MapReduce
Slide 55: Figure 2-4. MapReduce data flow with no reduce tasks The contract for the combiner function constrains the type of function that may be used. This is best illustrated with an example. Suppose that for the maximum temperature example, readings for the year 1950 were processed by two maps (because they were in different splits). Imagine the first map produced the output: (1950, 0) (1950, 20) (1950, 10) And the second produced: (1950, 25) (1950, 15) The reduce function would be called with a list of all the values: (1950, [0, 20, 10, 25, 15]) with output: (1950, 25) since 25 is the maximum value in the list. We could use a combiner function that, just like the reduce function, finds the maximum temperature for each map output. The reduce would then be called with: (1950, [20, 25]) and the reduce would produce the same output as before. More succinctly, we may express the function calls on the temperature values in this case as follows: max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25 Scaling Out | 31
Slide 56: Not all functions possess this property.† For example, if we were calculating mean temperatures, then we couldn’t use the mean as our combiner function, since: mean(0, 20, 10, 25, 15) = 14 but: mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15 The combiner function doesn’t replace the reduce function. (How could it? The reduce function is still needed to process records with the same key from different maps.) But it can help cut down the amount of data shuffled between the maps and the reduces, and for this reason alone it is always worth considering whether you can use a combiner function in your MapReduce job. Specifying a combiner function Going back to the Java MapReduce program, the combiner function is defined using the Reducer interface, and for this application, it is the same implementation as the reducer function in MaxTemperatureReducer. The only change we need to make is to set the combiner class on the JobConf (see Example 2-7). Example 2-7. Application to find the maximum temperature, using a combiner function for efficiency public class MaxTemperatureWithCombiner { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCombiner <input path> " + "<output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperatureWithCombiner.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setCombinerClass(MaxTemperatureReducer.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); } JobClient.runJob(conf); } † Functions with this property are called distributive in the paper “Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals,” Gray et al. (1995). 32 | Chapter 2: MapReduce
Slide 57: Running a Distributed MapReduce Job The same program will run, without alteration, on a full dataset. This is the point of MapReduce: it scales to the size of your data and the size of your hardware. Here’s one data point: on a 10-node EC2 cluster running High-CPU Extra Large Instances, the program took six minutes to run.‡ We’ll go through the mechanics of running programs on a cluster in Chapter 5. Hadoop Streaming Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java. Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce program. Streaming is naturally suited for text processing (although, as of version 0.21.0, it can handle binary streams, too), and when used in text mode, it has a line-oriented view of data. Map input data is passed over standard input to your map function, which processes it line by line and writes lines to standard output. A map output key-value pair is written as a single tab-delimited line. Input to the reduce function is in the same format—a tab-separated key-value pair—passed over standard input. The reduce function reads lines from standard input, which the framework guarantees are sorted by key, and writes its results to standard output. Let’s illustrate this by rewriting our MapReduce program for finding maximum temperatures by year in Streaming. Ruby The map function can be expressed in Ruby as shown in Example 2-8. Example 2-8. Map function for maximum temperature in Ruby #!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end ‡ This is a factor of seven faster than the serial run on one machine using awk. The main reason it wasn’t proportionately faster is because the input data wasn’t evenly partitioned. For convenience, the input files were gzipped by year, resulting in large files for later years in the dataset, when the number of weather records was much higher. Hadoop Streaming | 33
Slide 58: The program iterates over lines from standard input by executing a block for each line from STDIN (a global constant of type IO). The block pulls out the relevant fields from each input line, and, if the temperature is valid, writes the year and the temperature separated by a tab character \t to standard output (using puts). It’s worth drawing out a design difference between Streaming and the Java MapReduce API. The Java API is geared toward processing your map function one record at a time. The framework calls the map() method on your Mapper for each record in the input, whereas with Streaming the map program can decide how to process the input—for example, it could easily read and process multiple lines at a time since it’s in control of the reading. The user’s Java map implementation is “pushed” records, but it’s still possible to consider multiple lines at a time by accumulating previous lines in an instance variable in the Mapper.§ In this case, you need to implement the close() method so that you know when the last record has been read, so you can finish processing the last group of lines. Since the script just operates on standard input and output, it’s trivial to test the script without using Hadoop, simply using Unix pipes: % cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078 The reduce function shown in Example 2-9 is a little more complex. Example 2-9. Reduce function for maximum temperature in Ruby #!/usr/bin/env ruby last_key, max_val = nil, 0 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key § Alternatively, you could use “pull” style processing in the new MapReduce API—see “The new Java MapReduce API” on page 25. 34 | Chapter 2: MapReduce
Slide 59: Again, the program iterates over lines from standard input, but this time we have to store some state as we process each key group. In this case, the keys are weather station identifiers, and we store the last key seen and the maximum temperature seen so far for that key. The MapReduce framework ensures that the keys are ordered, so we know that if a key is different from the previous one, we have moved into a new key group. In contrast to the Java API, where you are provided an iterator over each key group, in Streaming you have to find key group boundaries in your program. For each line, we pull out the key and value, then if we’ve just finished a group (last_key && last_key != key), we write the key and the maximum temperature for that group, separated by a tab character, before resetting the maximum temperature for the new key. If we haven’t just finished a group, we just update the maximum temperature for the current key. The last line of the program ensures that a line is written for the last key group in the input. We can now simulate the whole MapReduce pipeline with a Unix pipeline (which is equivalent to the Unix pipeline shown in Figure 2-1): % cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb | \ sort | ch02/src/main/ruby/max_temperature_reduce.rb 1949 111 1950 22 The output is the same as the Java program, so the next step is to run it using Hadoop itself. The hadoop command doesn’t support a Streaming option; instead, you specify the Streaming JAR file along with the jar option. Options to the Streaming program specify the input and output paths, and the map and reduce scripts. This is what it looks like: % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02/src/main/ruby/max_temperature_map.rb \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb When running on a large dataset on a cluster, we should set the combiner, using the -combiner option. From release 0.21.0, the combiner can be any Streaming command. For earlier releases, the combiner had to be written in Java, so as a workaround it was common to do manual combining in the mapper, without having to resort to Java. In this case, we could change the mapper to be a pipeline: % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/all \ -output output \ -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort | ch02/src/main/ruby/max_temperature_reduce.rb" \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb \ Hadoop Streaming | 35
Slide 60: -file ch02/src/main/ruby/max_temperature_map.rb \ -file ch02/src/main/ruby/max_temperature_reduce.rb Note also the use of -file, which we use when running Streaming programs on the cluster to ship the scripts to the cluster. Python Streaming supports any programming language that can read from standard input, and write to standard output, so for readers more familiar with Python, here’s the same example again.‖ The map script is in Example 2-10, and the reduce script is in Example 2-11. Example 2-10. Map function for maximum temperature in Python #!/usr/bin/env python import re import sys for line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp) Example 2-11. Reduce function for maximum temperature in Python #!/usr/bin/env python import sys (last_key, max_val) = (None, 0) for line in sys.stdin: (key, val) = line.strip().split("\t") if last_key and last_key != key: print "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else: (last_key, max_val) = (key, max(max_val, int(val))) if last_key: print "%s\t%s" % (last_key, max_val) ‖ As an alternative to Streaming, Python programmers should consider Dumbo (http://www.last.fm/dumbo), which makes the Streaming MapReduce interface more Pythonic and easier to use. 36 | Chapter 2: MapReduce
Slide 61: We can test the programs and run the job in the same way we did in Ruby. For example, to run a test: % cat input/ncdc/sample.txt | ch02/src/main/python/max_temperature_map.py | \ sort | ch02/src/main/python/max_temperature_reduce.py 1949 111 1950 22 Hadoop Pipes Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce. Unlike Streaming, which uses standard input and output to communicate with the map and reduce code, Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function. JNI is not used. We’ll rewrite the example running through the chapter in C++, and then we’ll see how to run it using Pipes. Example 2-12 shows the source code for the map and reduce functions in C++. Example 2-12. Maximum temperature in C++ #include #include #include #include <algorithm> <limits> <stdint.h> <string> #include "hadoop/Pipes.hh" #include "hadoop/TemplateFactory.hh" #include "hadoop/StringUtils.hh" class MaxTemperatureMapper : public HadoopPipes::Mapper { public: MaxTemperatureMapper(HadoopPipes::TaskContext& context) { } void map(HadoopPipes::MapContext& context) { std::string line = context.getInputValue(); std::string year = line.substr(15, 4); std::string airTemperature = line.substr(87, 5); std::string q = line.substr(92, 1); if (airTemperature != "+9999" && (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) { context.emit(year, airTemperature); } } }; class MapTemperatureReducer : public HadoopPipes::Reducer { public: MapTemperatureReducer(HadoopPipes::TaskContext& context) { } void reduce(HadoopPipes::ReduceContext& context) { int maxValue = INT_MIN; while (context.nextValue()) { Hadoop Pipes | 37
Slide 62: }; } maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue())); } context.emit(context.getInputKey(), HadoopUtils::toString(maxValue)); int main(int argc, char *argv[]) { return HadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper, MapTemperatureReducer>()); } The application links against the Hadoop C++ library, which is a thin wrapper for communicating with the tasktracker child process. The map and reduce functions are defined by extending the Mapper and Reducer classes defined in the HadoopPipes namespace and providing implementations of the map() and reduce() methods in each case. These methods take a context object (of type MapContext or ReduceContext), which provides the means for reading input and writing output, as well as accessing job configuration information via the JobConf class. The processing in this example is very similar to the Java equivalent. Unlike the Java interface, keys and values in the C++ interface are byte buffers, represented as Standard Template Library (STL) strings. This makes the interface simpler, although it does put a slightly greater burden on the application developer, who has to convert to and from richer domain-level types. This is evident in MapTempera tureReducer where we have to convert the input value into an integer (using a convenience method in HadoopUtils) and then the maximum value back into a string before it’s written out. In some cases, we can save on doing the conversion, such as in MaxTem peratureMapper where the airTemperature value is never converted to an integer since it is never processed as a number in the map() method. The main() method is the application entry point. It calls HadoopPipes::runTask, which connects to the Java parent process and marshals data to and from the Mapper or Reducer. The runTask() method is passed a Factory so that it can create instances of the Mapper or Reducer. Which one it creates is controlled by the Java parent over the socket connection. There are overloaded template factory methods for setting a combiner, partitioner, record reader, or record writer. Compiling and Running Now we can compile and link our program using the Makefile in Example 2-13. Example 2-13. Makefile for C++ MapReduce program CC = g++ CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include max_temperature: max_temperature.cpp $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ -lhadooputils -lpthread -g -O2 -o $@ 38 | Chapter 2: MapReduce
Slide 63: The Makefile expects a couple of environment variables to be set. Apart from HADOOP_INSTALL (which you should already have set if you followed the installation instructions in Appendix A), you need to define PLATFORM, which specifies the operating system, architecture, and data model (e.g., 32- or 64-bit). I ran it on a 32-bit Linux system with the following: % export PLATFORM=Linux-i386-32 % make On successful completion, you’ll find the max_temperature executable in the current directory. To run a Pipes job, we need to run Hadoop in pseudo-distributed mode (where all the daemons run on the local machine), for which there are setup instructions in Appendix A. Pipes doesn’t run in standalone (local) mode, since it relies on Hadoop’s distributed cache mechanism, which works only when HDFS is running. With the Hadoop daemons now running, the first step is to copy the executable to HDFS so that it can be picked up by tasktrackers when they launch map and reduce tasks: % hadoop fs -put max_temperature bin/max_temperature The sample data also needs to be copied from the local filesystem into HDFS: % hadoop fs -put input/ncdc/sample.txt sample.txt Now we can run the job. For this, we use the Hadoop pipes command, passing the URI of the executable in HDFS using the -program argument: % hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -input sample.txt \ -output output \ -program bin/max_temperature We specify two properties using the -D option: hadoop.pipes.java.recordreader and hadoop.pipes.java.recordwriter, setting both to true to say that we have not specified a C++ record reader or writer, but that we want to use the default Java ones (which are for text input and output). Pipes also allows you to set a Java mapper, reducer, combiner, or partitioner. In fact, you can have a mixture of Java or C++ classes within any one job. The result is the same as the other versions of the same program that we ran. Hadoop Pipes | 39
Slide 65: CHAPTER 3 The Hadoop Distributed Filesystem When a dataset outgrows the storage capacity of a single physical machine, it becomes necessary to partition it across a number of separate machines. Filesystems that manage the storage across a network of machines are called distributed filesystems. Since they are network-based, all the complications of network programming kick in, thus making distributed filesystems more complex than regular disk filesystems. For example, one of the biggest challenges is making the filesystem tolerate node failure without suffering data loss. Hadoop comes with a distributed filesystem called HDFS, which stands for Hadoop Distributed Filesystem. (You may sometimes see references to “DFS”—informally or in older documentation or configurations—which is the same thing.) HDFS is Hadoop’s flagship filesystem and is the focus of this chapter, but Hadoop actually has a generalpurpose filesystem abstraction, so we’ll see along the way how Hadoop integrates with other storage systems (such as the local filesystem and Amazon S3). The Design of HDFS HDFS is a filesystem designed for storing very large files with streaming data access patterns, running on clusters of commodity hardware.* Let’s examine this statement in more detail: Very large files “Very large” in this context means files that are hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of data.† * The architecture of HDFS is described in “The Hadoop Distributed File System” by Konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler (Proceedings of MSST2010, May 2010, http:// storageconference.org/2010/Papers/MSST/Shvachko.pdf). † “Scaling Hadoop to 4000 nodes at Yahoo!,” http://developer.yahoo.net/blogs/hadoop/2008/09/scaling_hadoop _to_4000_nodes_a.html. 41
Slide 66: Streaming data access HDFS is built around the idea that the most efficient data processing pattern is a write-once, read-many-times pattern. A dataset is typically generated or copied from source, then various analyses are performed on that dataset over time. Each analysis will involve a large proportion, if not all, of the dataset, so the time to read the whole dataset is more important than the latency in reading the first record. Commodity hardware Hadoop doesn’t require expensive, highly reliable hardware to run on. It’s designed to run on clusters of commodity hardware (commonly available hardware available from multiple vendors‡) for which the chance of node failure across the cluster is high, at least for large clusters. HDFS is designed to carry on working without a noticeable interruption to the user in the face of such failure. It is also worth examining the applications for which using HDFS does not work so well. While this may change in the future, these are areas where HDFS is not a good fit today: Low-latency data access Applications that require low-latency access to data, in the tens of milliseconds range, will not work well with HDFS. Remember, HDFS is optimized for delivering a high throughput of data, and this may be at the expense of latency. HBase (Chapter 13) is currently a better choice for low-latency access. Lots of small files Since the namenode holds filesystem metadata in memory, the limit to the number of files in a filesystem is governed by the amount of memory on the namenode. As a rule of thumb, each file, directory, and block takes about 150 bytes. So, for example, if you had one million files, each taking one block, you would need at least 300 MB of memory. While storing millions of files is feasible, billions is beyond the capability of current hardware.§ Multiple writers, arbitrary file modifications Files in HDFS may be written to by a single writer. Writes are always made at the end of the file. There is no support for multiple writers, or for modifications at arbitrary offsets in the file. (These might be supported in the future, but they are likely to be relatively inefficient.) ‡ See Chapter 9 for a typical machine specification. § For an in-depth exposition of the scalability limits of HDFS, see Konstantin V. Shvachko’s “Scalability of the Hadoop Distributed File System,” (http://developer.yahoo.net/blogs/hadoop/2010/05/scalability_of_the _hadoop_dist.html) and the companion paper “HDFS Scalability: The limits to growth,” (April 2010, pp. 6– 16. http://www.usenix.org/publications/login/2010-04/openpdfs/shvachko.pdf) by the same author. 42 | Chapter 3: The Hadoop Distributed Filesystem
Slide 67: HDFS Concepts Blocks A disk has a block size, which is the minimum amount of data that it can read or write. Filesystems for a single disk build on this by dealing with data in blocks, which are an integral multiple of the disk block size. Filesystem blocks are typically a few kilobytes in size, while disk blocks are normally 512 bytes. This is generally transparent to the filesystem user who is simply reading or writing a file—of whatever length. However, there are tools to perform filesystem maintenance, such as df and fsck, that operate on the filesystem block level. HDFS, too, has the concept of a block, but it is a much larger unit—64 MB by default. Like in a filesystem for a single disk, files in HDFS are broken into block-sized chunks, which are stored as independent units. Unlike a filesystem for a single disk, a file in HDFS that is smaller than a single block does not occupy a full block’s worth of underlying storage. When unqualified, the term “block” in this book refers to a block in HDFS. Why Is a Block in HDFS So Large? HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost of seeks. By making a block large enough, the time to transfer the data from the disk can be made to be significantly larger than the time to seek to the start of the block. Thus the time to transfer a large file made of multiple blocks operates at the disk transfer rate. A quick calculation shows that if the seek time is around 10 ms, and the transfer rate is 100 MB/s, then to make the seek time 1% of the transfer time, we need to make the block size around 100 MB. The default is actually 64 MB, although many HDFS installations use 128 MB blocks. This figure will continue to be revised upward as transfer speeds grow with new generations of disk drives. This argument shouldn’t be taken too far, however. Map tasks in MapReduce normally operate on one block at a time, so if you have too few tasks (fewer than nodes in the cluster), your jobs will run slower than they could otherwise. Having a block abstraction for a distributed filesystem brings several benefits. The first benefit is the most obvious: a file can be larger than any single disk in the network. There’s nothing that requires the blocks from a file to be stored on the same disk, so they can take advantage of any of the disks in the cluster. In fact, it would be possible, if unusual, to store a single file on an HDFS cluster whose blocks filled all the disks in the cluster. HDFS Concepts | 43
Slide 68: Second, making the unit of abstraction a block rather than a file simplifies the storage subsystem. Simplicity is something to strive for all in all systems, but is especially important for a distributed system in which the failure modes are so varied. The storage subsystem deals with blocks, simplifying storage management (since blocks are a fixed size, it is easy to calculate how many can be stored on a given disk) and eliminating metadata concerns (blocks are just a chunk of data to be stored—file metadata such as permissions information does not need to be stored with the blocks, so another system can handle metadata separately). Furthermore, blocks fit well with replication for providing fault tolerance and availability. To insure against corrupted blocks and disk and machine failure, each block is replicated to a small number of physically separate machines (typically three). If a block becomes unavailable, a copy can be read from another location in a way that is transparent to the client. A block that is no longer available due to corruption or machine failure can be replicated from its alternative locations to other live machines to bring the replication factor back to the normal level. (See “Data Integrity” on page 75 for more on guarding against corrupt data.) Similarly, some applications may choose to set a high replication factor for the blocks in a popular file to spread the read load on the cluster. Like its disk filesystem cousin, HDFS’s fsck command understands blocks. For example, running: % hadoop fsck / -files -blocks will list the blocks that make up each file in the filesystem. (See also “Filesystem check (fsck)” on page 301.) Namenodes and Datanodes An HDFS cluster has two types of node operating in a master-worker pattern: a namenode (the master) and a number of datanodes (workers). The namenode manages the filesystem namespace. It maintains the filesystem tree and the metadata for all the files and directories in the tree. This information is stored persistently on the local disk in the form of two files: the namespace image and the edit log. The namenode also knows the datanodes on which all the blocks for a given file are located, however, it does not store block locations persistently, since this information is reconstructed from datanodes when the system starts. A client accesses the filesystem on behalf of the user by communicating with the namenode and datanodes. The client presents a POSIX-like filesystem interface, so the user code does not need to know about the namenode and datanode to function. Datanodes are the workhorses of the filesystem. They store and retrieve blocks when they are told to (by clients or the namenode), and they report back to the namenode periodically with lists of blocks that they are storing. 44 | Chapter 3: The Hadoop Distributed Filesystem
Slide 69: Without the namenode, the filesystem cannot be used. In fact, if the machine running the namenode were obliterated, all the files on the filesystem would be lost since there would be no way of knowing how to reconstruct the files from the blocks on the datanodes. For this reason, it is important to make the namenode resilient to failure, and Hadoop provides two mechanisms for this. The first way is to back up the files that make up the persistent state of the filesystem metadata. Hadoop can be configured so that the namenode writes its persistent state to multiple filesystems. These writes are synchronous and atomic. The usual configuration choice is to write to local disk as well as a remote NFS mount. It is also possible to run a secondary namenode, which despite its name does not act as a namenode. Its main role is to periodically merge the namespace image with the edit log to prevent the edit log from becoming too large. The secondary namenode usually runs on a separate physical machine, since it requires plenty of CPU and as much memory as the namenode to perform the merge. It keeps a copy of the merged namespace image, which can be used in the event of the namenode failing. However, the state of the secondary namenode lags that of the primary, so in the event of total failure of the primary, data loss is almost certain. The usual course of action in this case is to copy the namenode’s metadata files that are on NFS to the secondary and run it as the new primary. See “The filesystem image and edit log” on page 294 for more details. The Command-Line Interface We’re going to have a look at HDFS by interacting with it from the command line. There are many other interfaces to HDFS, but the command line is one of the simplest and, to many developers, the most familiar. We are going to run HDFS on one machine, so first follow the instructions for setting up Hadoop in pseudo-distributed mode in Appendix A. Later you’ll see how to run on a cluster of machines to give us scalability and fault tolerance. There are two properties that we set in the pseudo-distributed configuration that deserve further explanation. The first is fs.default.name, set to hdfs://localhost/, which is used to set a default filesystem for Hadoop. Filesystems are specified by a URI, and here we have used an hdfs URI to configure Hadoop to use HDFS by default. The HDFS daemons will use this property to determine the host and port for the HDFS namenode. We’ll be running it on localhost, on the default HDFS port, 8020. And HDFS clients will use this property to work out where the namenode is running so they can connect to it. The Command-Line Interface | 45
Slide 70: We set the second property, dfs.replication, to 1 so that HDFS doesn’t replicate filesystem blocks by the default factor of three. When running with a single datanode, HDFS can’t replicate blocks to three datanodes, so it would perpetually warn about blocks being under-replicated. This setting solves that problem. Basic Filesystem Operations The filesystem is ready to be used, and we can do all of the usual filesystem operations such as reading files, creating directories, moving files, deleting data, and listing directories. You can type hadoop fs -help to get detailed help on every command. Start by copying a file from the local filesystem to HDFS: % hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://localhost/user/tom/ quangle.txt This command invokes Hadoop’s filesystem shell command fs, which supports a number of subcommands—in this case, we are running -copyFromLocal. The local file quangle.txt is copied to the file /user/tom/quangle.txt on the HDFS instance running on localhost. In fact, we could have omitted the scheme and host of the URI and picked up the default, hdfs://localhost, as specified in core-site.xml: % hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt We could also have used a relative path and copied the file to our home directory in HDFS, which in this case is /user/tom: % hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt Let’s copy the file back to the local filesystem and check whether it’s the same: % hadoop fs -copyToLocal quangle.txt quangle.copy.txt % md5 input/docs/quangle.txt quangle.copy.txt MD5 (input/docs/quangle.txt) = a16f231da6b05e2ba7a339320e7dacd9 MD5 (quangle.copy.txt) = a16f231da6b05e2ba7a339320e7dacd9 The MD5 digests are the same, showing that the file survived its trip to HDFS and is back intact. Finally, let’s look at an HDFS file listing. We create a directory first just to see how it is displayed in the listing: % hadoop fs -mkdir books % hadoop fs -ls . Found 2 items drwxr-xr-x - tom supergroup -rw-r--r-1 tom supergroup 0 2009-04-02 22:41 /user/tom/books 118 2009-04-02 22:29 /user/tom/quangle.txt The information returned is very similar to the Unix command ls -l, with a few minor differences. The first column shows the file mode. The second column is the replication factor of the file (something a traditional Unix filesystem does not have). Remember we set the default replication factor in the site-wide configuration to be 1, which is why we see the same value here. The entry in this column is empty for directories since the 46 | Chapter 3: The Hadoop Distributed Filesystem
Slide 71: concept of replication does not apply to them—directories are treated as metadata and stored by the namenode, not the datanodes. The third and fourth columns show the file owner and group. The fifth column is the size of the file in bytes, or zero for directories. The sixth and seventh columns are the last modified date and time. Finally, the eighth column is the absolute name of the file or directory. File Permissions in HDFS HDFS has a permissions model for files and directories that is much like POSIX. There are three types of permission: the read permission (r), the write permission (w), and the execute permission (x). The read permission is required to read files or list the contents of a directory. The write permission is required to write a file, or for a directory, to create or delete files or directories in it. The execute permission is ignored for a file since you can’t execute a file on HDFS (unlike POSIX), and for a directory it is required to access its children. Each file and directory has an owner, a group, and a mode. The mode is made up of the permissions for the user who is the owner, the permissions for the users who are members of the group, and the permissions for users who are neither the owners nor members of the group. By default, a client’s identity is determined by the username and groups of the process it is running in. Because clients are remote, this makes it possible to become an arbitrary user, simply by creating an account of that name on the remote system. Thus, permissions should be used only in a cooperative community of users, as a mechanism for sharing filesystem resources and for avoiding accidental data loss, and not for securing resources in a hostile environment. (Note, however, that the latest versions of Hadoop support Kerberos authentication, which removes these restrictions, see “Security” on page 281.) Despite these limitations, it is worthwhile having permissions enabled (as it is by default; see the dfs.permissions property), to avoid accidental modification or deletion of substantial parts of the filesystem, either by users or by automated tools or programs. When permissions checking is enabled, the owner permissions are checked if the client’s username matches the owner, and the group permissions are checked if the client is a member of the group; otherwise, the other permissions are checked. There is a concept of a super-user, which is the identity of the namenode process. Permissions checks are not performed for the super-user. Hadoop Filesystems Hadoop has an abstract notion of filesystem, of which HDFS is just one implementation. The Java abstract class org.apache.hadoop.fs.FileSystem represents a filesystem in Hadoop, and there are several concrete implementations, which are described in Table 3-1. Hadoop Filesystems | 47
Slide 72: Table 3-1. Hadoop filesystems Filesystem Local URI scheme file Java implementation (all under org.apache.hadoop) fs.LocalFileSystem Description A filesystem for a locally connected disk with clientside checksums. Use RawLocalFileSystem for a local filesystem with no checksums. See “LocalFileSystem” on page 76. Hadoop’s distributed filesystem. HDFS is designed to work efficiently in conjunction with MapReduce. A filesystem providing read-only access to HDFS over HTTP. (Despite its name, HFTP has no connection with FTP.) Often used with distcp (see “Parallel Copying with distcp” on page 70) to copy data between HDFS clusters running different versions. A filesystem providing read-only access to HDFS over HTTPS. (Again, this has no connection with FTP.) A filesystem layered on another filesystem for archiving files. Hadoop Archives are typically used for archiving files in HDFS to reduce the namenode’s memory usage. See “Hadoop Archives” on page 71. CloudStore (formerly Kosmos filesystem) is a distributed filesystem like HDFS or Google’s GFS, written in C++. Find more information about it at http://kosmosfs.sourceforge.net/. A filesystem backed by an FTP server. A filesystem backed by Amazon S3. See http://wiki .apache.org/hadoop/AmazonS3. A filesystem backed by Amazon S3, which stores files in blocks (much like HDFS) to overcome S3’s 5 GB file size limit. HDFS HFTP hdfs hftp hdfs. DistributedFileSystem hdfs.HftpFileSystem HSFTP HAR hsftp har hdfs.HsftpFileSystem fs.HarFileSystem KFS (CloudStore) kfs fs.kfs. KosmosFileSystem FTP S3 (native) S3 (blockbased) ftp s3n s3 fs.ftp.FTPFileSystem fs.s3native. NativeS3FileSystem fs.s3.S3FileSystem Hadoop provides many interfaces to its filesystems, and it generally uses the URI scheme to pick the correct filesystem instance to communicate with. For example, the filesystem shell that we met in the previous section operates with all Hadoop filesystems. To list the files in the root directory of the local filesystem, type: % hadoop fs -ls file:/// Although it is possible (and sometimes very convenient) to run MapReduce programs that access any of these filesystems, when you are processing large volumes of data, you should choose a distributed filesystem that has the data locality optimization, such as HDFS or KFS (see “Scaling Out” on page 27). 48 | Chapter 3: The Hadoop Distributed Filesystem
Slide 73: Interfaces Hadoop is written in Java, and all Hadoop filesystem interactions are mediated through the Java API.‖ The filesystem shell, for example, is a Java application that uses the Java FileSystem class to provide filesystem operations. The other filesystem interfaces are discussed briefly in this section. These interfaces are most commonly used with HDFS, since the other filesystems in Hadoop typically have existing tools to access the underlying filesystem (FTP clients for FTP, S3 tools for S3, etc.), but many of them will work with any Hadoop filesystem. Thrift By exposing its filesystem interface as a Java API, Hadoop makes it awkward for nonJava applications to access Hadoop filesystems. The Thrift API in the “thriftfs” contrib module remedies this deficiency by exposing Hadoop filesystems as an Apache Thrift service, making it easy for any language that has Thrift bindings to interact with a Hadoop filesystem, such as HDFS. To use the Thrift API, run a Java server that exposes the Thrift service and acts as a proxy to the Hadoop filesystem. Your application accesses the Thrift service, which is typically running on the same machine as your application. The Thrift API comes with a number of pregenerated stubs for a variety of languages, including C++, Perl, PHP, Python, and Ruby. Thrift has support for versioning, so it’s a good choice if you want to access different versions of a Hadoop filesystem from the same client code (you will need to run a proxy for each version of Hadoop to achieve this, however). For installation and usage instructions, please refer to the documentation in the src/contrib/thriftfs directory of the Hadoop distribution. C Hadoop provides a C library called libhdfs that mirrors the Java FileSystem interface (it was written as a C library for accessing HDFS, but despite its name it can be used to access any Hadoop filesystem). It works using the Java Native Interface (JNI) to call a Java filesystem client. The C API is very similar to the Java one, but it typically lags the Java one, so newer features may not be supported. You can find the generated documentation for the C API in the libhdfs/docs/api directory of the Hadoop distribution. ‖ The RPC interfaces in Hadoop are based on Hadoop’s Writable interface, which is Java-centric. In the future, Hadoop will adopt Avro, a cross-language, RPC framework, which will allow native HDFS clients to be written in languages other than Java. Hadoop Filesystems | 49
Slide 74: Hadoop comes with prebuilt libhdfs binaries for 32-bit Linux, but for other platforms, you will need to build them yourself using the instructions at http://wiki.apache.org/ hadoop/LibHDFS. FUSE Filesystem in Userspace (FUSE) allows filesystems that are implemented in user space to be integrated as a Unix filesystem. Hadoop’s Fuse-DFS contrib module allows any Hadoop filesystem (but typically HDFS) to be mounted as a standard filesystem. You can then use Unix utilities (such as ls and cat) to interact with the filesystem, as well as POSIX libraries to access the filesystem from any programming language. Fuse-DFS is implemented in C using libhdfs as the interface to HDFS. Documentation for compiling and running Fuse-DFS is located in the src/contrib/fuse-dfs directory of the Hadoop distribution. WebDAV WebDAV is a set of extensions to HTTP to support editing and updating files. WebDAV shares can be mounted as filesystems on most operating systems, so by exposing HDFS (or other Hadoop filesystems) over WebDAV, it’s possible to access HDFS as a standard filesystem. At the time of this writing, WebDAV support in Hadoop (which is implemented by calling the Java API to Hadoop) is still under development, and can be tracked at https: //issues.apache.org/jira/browse/HADOOP-496. Other HDFS Interfaces There are two interfaces that are specific to HDFS: HTTP HDFS defines a read-only interface for retrieving directory listings and data over HTTP. Directory listings are served by the namenode’s embedded web server (which runs on port 50070) in XML format, while file data is streamed from datanodes by their web servers (running on port 50075). This protocol is not tied to a specific HDFS version, making it possible to write clients that can use HTTP to read data from HDFS clusters that run different versions of Hadoop. HftpFile System is a such a client: it is a Hadoop filesystem that talks to HDFS over HTTP (HsftpFileSystem is the HTTPS variant). FTP Although not complete at the time of this writing (https://issues.apache.org/jira/ browse/HADOOP-3199), there is an FTP interface to HDFS, which permits the use of the FTP protocol to interact with HDFS. This interface is a convenient way to transfer data into and out of HDFS using existing FTP clients. 50 | Chapter 3: The Hadoop Distributed Filesystem
Slide 75: The FTP interface to HDFS is not to be confused with FTPFileSystem, which exposes any FTP server as a Hadoop filesystem. The Java Interface In this section, we dig into the Hadoop’s FileSystem class: the API for interacting with one of Hadoop’s filesystems.# While we focus mainly on the HDFS implementation, DistributedFileSystem, in general you should strive to write your code against the FileSystem abstract class, to retain portability across filesystems. This is very useful when testing your program, for example, since you can rapidly run tests using data stored on the local filesystem. Reading Data from a Hadoop URL One of the simplest ways to read a file from a Hadoop filesystem is by using a java.net.URL object to open a stream to read the data from. The general idiom is: InputStream in = null; try { in = new URL("hdfs://host/path").openStream(); // process in } finally { IOUtils.closeStream(in); } There’s a little bit more work required to make Java recognize Hadoop’s hdfs URL scheme. This is achieved by calling the setURLStreamHandlerFactory method on URL with an instance of FsUrlStreamHandlerFactory. This method can only be called once per JVM, so it is typically executed in a static block. This limitation means that if some other part of your program—perhaps a third-party component outside your control— sets a URLStreamHandlerFactory, you won’t be able to use this approach for reading data from Hadoop. The next section discusses an alternative. Example 3-1 shows a program for displaying files from Hadoop filesystems on standard output, like the Unix cat command. Example 3-1. Displaying files from a Hadoop filesystem on standard output using a URLStreamHandler public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } #From release 0.21.0, there is a new filesystem interface called FileContext with better handling of multiple filesystems (so a single FileContext can resolve multiple filesystem schemes, for example) and a cleaner, more consistent interface. The Java Interface | 51
Slide 76: } public static void main(String[] args) throws Exception { InputStream in = null; try { in = new URL(args[0]).openStream(); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } We make use of the handy IOUtils class that comes with Hadoop for closing the stream in the finally clause, and also for copying bytes between the input stream and the output stream (System.out in this case). The last two arguments to the copyBytes method are the buffer size used for copying and whether to close the streams when the copy is complete. We close the input stream ourselves, and System.out doesn’t need to be closed. Here’s a sample run:* % hadoop URLCat hdfs://localhost/user/tom/quangle.txt On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. Reading Data Using the FileSystem API As the previous section explained, sometimes it is impossible to set a URLStreamHand lerFactory for your application. In this case, you will need to use the FileSystem API to open an input stream for a file. A file in a Hadoop filesystem is represented by a Hadoop Path object (and not a java.io.File object, since its semantics are too closely tied to the local filesystem). You can think of a Path as a Hadoop filesystem URI, such as hdfs://localhost/user/tom/ quangle.txt. FileSystem is a general filesystem API, so the first step is to retrieve an instance for the filesystem we want to use—HDFS in this case. There are two static factory methods for getting a FileSystem instance: public static FileSystem get(Configuration conf) throws IOException public static FileSystem get(URI uri, Configuration conf) throws IOException A Configuration object encapsulates a client or server’s configuration, which is set using configuration files read from the classpath, such as conf/core-site.xml. The first method returns the default filesystem (as specified in the file conf/core-site.xml, or the default local filesystem if not specified there). The second uses the given URI’s scheme and * The text is from The Quangle Wangle’s Hat by Edward Lear. 52 | Chapter 3: The Hadoop Distributed Filesystem
Slide 77: authority to determine the filesystem to use, falling back to the default filesystem if no scheme is specified in the given URI. With a FileSystem instance in hand, we invoke an open() method to get the input stream for a file: public FSDataInputStream open(Path f) throws IOException public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException The first method uses a default buffer size of 4 K. Putting this together, we can rewrite Example 3-1 as shown in Example 3-2. Example 3-2. Displaying files from a Hadoop filesystem on standard output by using the FileSystem directly public class FileSystemCat { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); InputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } } The program runs as follows: % hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. FSDataInputStream The open() method on FileSystem actually returns a FSDataInputStream rather than a standard java.io class. This class is a specialization of java.io.DataInputStream with support for random access, so you can read from any part of the stream: package org.apache.hadoop.fs; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable { // implementation elided } The Java Interface | 53
Slide 78: The Seekable interface permits seeking to a position in the file and a query method for the current offset from the start of the file (getPos()): public interface Seekable { void seek(long pos) throws IOException; long getPos() throws IOException; } Calling seek() with a position that is greater than the length of the file will result in an IOException. Unlike the skip() method of java.io.InputStream that positions the stream at a point later than the current position, seek() can move to an arbitrary, absolute position in the file. Example 3-3 is a simple extension of Example 3-2 that writes a file to standard out twice: after writing it once, it seeks to the start of the file and streams through it once again. Example 3-3. Displaying files from a Hadoop filesystem on standard output twice, by using seek public class FileSystemDoubleCat { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); in.seek(0); // go back to the start of the file IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } } Here’s the result of running it on a small file: % hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. FSDataInputStream also implements the PositionedReadable interface for reading parts of a file at a given offset: public interface PositionedReadable { public int read(long position, byte[] buffer, int offset, int length) 54 | Chapter 3: The Hadoop Distributed Filesystem
Slide 79: throws IOException; public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; } public void readFully(long position, byte[] buffer) throws IOException; The read() method reads up to length bytes from the given position in the file into the buffer at the given offset in the buffer. The return value is the number of bytes actually read: callers should check this value as it may be less than length. The readFully() methods will read length bytes into the buffer (or buffer.length bytes for the version that just takes a byte array buffer), unless the end of the file is reached, in which case an EOFException is thrown. All of these methods preserve the current offset in the file and are thread-safe, so they provide a convenient way to access another part of the file—metadata perhaps—while reading the main body of the file. In fact, they are just implemented using the Seekable interface using the following pattern: long oldPos = getPos(); try { seek(position); // read data } finally { seek(oldPos); } Finally, bear in mind that calling seek() is a relatively expensive operation and should be used sparingly. You should structure your application access patterns to rely on streaming data, (by using MapReduce, for example) rather than performing a large number of seeks. Writing Data The FileSystem class has a number of methods for creating a file. The simplest is the method that takes a Path object for the file to be created and returns an output stream to write to: public FSDataOutputStream create(Path f) throws IOException There are overloaded versions of this method that allow you to specify whether to forcibly overwrite existing files, the replication factor of the file, the buffer size to use when writing the file, the block size for the file, and file permissions. The create() methods create any parent directories of the file to be written that don’t already exist. Though convenient, this behavior may be unexpected. If you want the write to fail if the parent directory doesn’t exist, then you should check for the existence of the parent directory first by calling the exists() method. The Java Interface | 55
Slide 80: There’s also an overloaded method for passing a callback interface, Progressable, so your application can be notified of the progress of the data being written to the datanodes: package org.apache.hadoop.util; public interface Progressable { public void progress(); } As an alternative to creating a new file, you can append to an existing file using the append() method (there are also some other overloaded versions): public FSDataOutputStream append(Path f) throws IOException The append operation allows a single writer to modify an already written file by opening it and writing data from the final offset in the file. With this API, applications that produce unbounded files, such as logfiles, can write to an existing file after a restart, for example. The append operation is optional and not implemented by all Hadoop filesystems. For example, HDFS supports append, but S3 filesystems don’t. Example 3-4 shows how to copy a local file to a Hadoop filesystem. We illustrate progress by printing a period every time the progress() method is called by Hadoop, which is after each 64 K packet of data is written to the datanode pipeline. (Note that this particular behavior is not specified by the API, so it is subject to change in later versions of Hadoop. The API merely allows you to infer that “something is happening.”) Example 3-4. Copying a local file to a Hadoop filesystem public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = args[0]; String dst = args[1]; InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.print("."); } }); } IOUtils.copyBytes(in, out, 4096, true); } Typical usage: % hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/ 1400-8.txt ............... 56 | Chapter 3: The Hadoop Distributed Filesystem
Slide 81: Currently, none of the other Hadoop filesystems call progress() during writes. Progress is important in MapReduce applications, as you will see in later chapters. FSDataOutputStream The create() method on FileSystem returns an FSDataOutputStream, which, like FSDataInputStream, has a method for querying the current position in the file: package org.apache.hadoop.fs; public class FSDataOutputStream extends DataOutputStream implements Syncable { public long getPos() throws IOException { // implementation elided } // implementation elided } However, unlike FSDataInputStream, FSDataOutputStream does not permit seeking. This is because HDFS allows only sequential writes to an open file or appends to an already written file. In other words, there is no support for writing to anywhere other than the end of the file, so there is no value in being able to seek while writing. Directories FileSystem provides a method to create a directory: public boolean mkdirs(Path f) throws IOException This method creates all of the necessary parent directories if they don’t already exist, just like the java.io.File’s mkdirs() method. It returns true if the directory (and all parent directories) was (were) successfully created. Often, you don’t need to explicitly create a directory, since writing a file, by calling create(), will automatically create any parent directories. Querying the Filesystem File metadata: FileStatus An important feature of any filesystem is the ability to navigate its directory structure and retrieve information about the files and directories that it stores. The FileStatus class encapsulates filesystem metadata for files and directories, including file length, block size, replication, modification time, ownership, and permission information. The method getFileStatus() on FileSystem provides a way of getting a FileStatus object for a single file or directory. Example 3-5 shows an example of its use. The Java Interface | 57
Slide 82: Example 3-5. Demonstrating file status information public class ShowFileStatusTest { private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing private FileSystem fs; @Before public void setUp() throws IOException { Configuration conf = new Configuration(); if (System.getProperty("test.build.data") == null) { System.setProperty("test.build.data", "/tmp"); } cluster = new MiniDFSCluster(conf, 1, true, null); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException { if (fs != null) { fs.close(); } if (cluster != null) { cluster.shutdown(); } } @Test(expected = FileNotFoundException.class) public void throwsFileNotFoundForNonExistentFile() throws IOException { fs.getFileStatus(new Path("no-such-file")); } @Test public void fileStatusForFile() throws IOException { Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(), is("/dir/file")); assertThat(stat.isDir(), is(false)); assertThat(stat.getLen(), is(7L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short) 1)); assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L)); assertThat(stat.getOwner(), is("tom")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rw-r--r--")); } @Test public void fileStatusForDirectory() throws IOException { Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().toUri().getPath(), is("/dir")); assertThat(stat.isDir(), is(true)); assertThat(stat.getLen(), is(0L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); 58 | Chapter 3: The Hadoop Distributed Filesystem
Slide 83: } } assertThat(stat.getReplication(), is((short) 0)); assertThat(stat.getBlockSize(), is(0L)); assertThat(stat.getOwner(), is("tom")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); If no file or directory exists, a FileNotFoundException is thrown. However, if you are interested only in the existence of a file or directory, then the exists() method on FileSystem is more convenient: public boolean exists(Path f) throws IOException Listing files Finding information on a single file or directory is useful, but you also often need to be able to list the contents of a directory. That’s what FileSystem’s listStatus() methods are for: public public public public FileStatus[] FileStatus[] FileStatus[] FileStatus[] listStatus(Path f) throws IOException listStatus(Path f, PathFilter filter) throws IOException listStatus(Path[] files) throws IOException listStatus(Path[] files, PathFilter filter) throws IOException When the argument is a file, the simplest variant returns an array of FileStatus objects of length 1. When the argument is a directory, it returns zero or more FileStatus objects representing the files and directories contained in the directory. Overloaded variants allow a PathFilter to be supplied to restrict the files and directories to match—you will see an example in section “PathFilter” on page 61. Finally, if you specify an array of paths, the result is a shortcut for calling the equivalent single-path listStatus method for each path in turn and accumulating the FileStatus object arrays in a single array. This can be useful for building up lists of input files to process from distinct parts of the filesystem tree. Example 3-6 is a simple demonstration of this idea. Note the use of stat2Paths() in FileUtil for turning an array of FileStatus objects to an array of Path objects. Example 3-6. Showing the file statuses for a collection of paths in a Hadoop filesystem public class ListStatus { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]); } The Java Interface | 59
Slide 84: } } FileStatus[] status = fs.listStatus(paths); Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths) { System.out.println(p); } We can use this program to find the union of directory listings for a collection of paths: % hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom hdfs://localhost/user hdfs://localhost/user/tom/books hdfs://localhost/user/tom/quangle.txt File patterns It is a common requirement to process sets of files in a single operation. For example, a MapReduce job for log processing might analyze a month’s worth of files contained in a number of directories. Rather than having to enumerate each file and directory to specify the input, it is convenient to use wildcard characters to match multiple files with a single expression, an operation that is known as globbing. Hadoop provides two FileSystem method for processing globs: public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException The globStatus() method returns an array of FileStatus objects whose paths match the supplied pattern, sorted by path. An optional PathFilter can be specified to restrict the matches further. Hadoop supports the same set of glob characters as Unix bash (see Table 3-2). Table 3-2. Glob characters and their meanings Glob * ? [ab] [^ab] [a-b] [^a-b] {a,b} \c Name asterisk question mark character class negated character class character range negated character range alternation escaped character Matches Matches zero or more characters Matches a single character Matches a single character in the set {a, b} Matches a single character that is not in the set {a, b} Matches a single character in the (closed) range [a, b], where a is lexicographically less than or equal to b Matches a single character that is not in the (closed) range [a, b], where a is lexicographically less than or equal to b Matches either expression a or b Matches character c when it is a metacharacter 60 | Chapter 3: The Hadoop Distributed Filesystem
Slide 85: Imagine that logfiles are stored in a directory structure organized hierarchically by date. So, for example, logfiles for the last day of 2007 would go in a directory named /2007/12/31. Suppose that the full file listing is: • • • • /2007/12/30 /2007/12/31 /2008/01/01 /2008/01/02 Here are some file globs and their expansions: Glob /* /*/* /*/12/* /200? /200[78] /200[7-8] /200[^01234569] /*/*/{31,01} /*/*/3{0,1} /*/{12/31,01/01} Expansion /2007 /2008 /2007/12 /2008/01 /2007/12/30 /2007/12/31 /2007 /2008 /2007 /2008 /2007 /2008 /2007 /2008 /2007/12/31 /2008/01/01 /2007/12/30 /2007/12/31 /2007/12/31 /2008/01/01 PathFilter Glob patterns are not always powerful enough to describe a set of files you want to access. For example, it is not generally possible to exclude a particular file using a glob pattern. The listStatus() and globStatus() methods of FileSystem take an optional PathFilter, which allows programmatic control over matching: package org.apache.hadoop.fs; public interface PathFilter { boolean accept(Path path); } PathFilter is the equivalent of java.io.FileFilter for Path objects rather than File objects. Example 3-7 shows a PathFilter for excluding paths that match a regular expression. The Java Interface | 61
Slide 86: Example 3-7. A PathFilter for excluding paths that match a regular expression public class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { return !path.toString().matches(regex); } } The filter passes only files that don’t match the regular expression. We use the filter in conjunction with a glob that picks out an initial set of files to include: the filter is used to refine the results. For example: fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$")) will expand to /2007/12/30. Filters can only act on a file’s name, as represented by a Path. They can’t use a file’s properties, such as creation time, as the basis of the filter. Nevertheless, they can perform matching that neither glob patterns nor regular expressions can achieve. For example, if you store files in a directory structure that is laid out by date (like in the previous section), then you can write a PathFilter to pick out files that fall in a given date range. Deleting Data Use the delete() method on FileSystem to permanently remove files or directories: public boolean delete(Path f, boolean recursive) throws IOException If f is a file or an empty directory, then the value of recursive is ignored. A nonempty directory is only deleted, along with its contents, if recursive is true (otherwise an IOException is thrown). Data Flow Anatomy of a File Read To get an idea of how data flows between the client interacting with HDFS, the namenode and the datanodes, consider Figure 3-1, which shows the main sequence of events when reading a file. 62 | Chapter 3: The Hadoop Distributed Filesystem
Slide 87: Figure 3-1. A client reading data from HDFS The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem (step 1 in Figure 3-1). DistributedFileSystem calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file (step 2). For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Furthermore, the datanodes are sorted according to their proximity to the client (according to the topology of the cluster’s network; see “Network Topology and Hadoop” on page 64). If the client is itself a datanode (in the case of a MapReduce task, for instance), then it will read from the local datanode, if it hosts a copy of the block. The DistributedFileSystem returns an FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from. FSDataInputStream in turn wraps a DFSInputStream, which manages the datanode and namenode I/O. The client then calls read() on the stream (step 3). DFSInputStream, which has stored the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream (step 4). When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block (step 5). This happens transparently to the client, which from its point of view is just reading a continuous stream. Blocks are read in order with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed. When the client has finished reading, it calls close() on the FSDataInputStream (step 6). Data Flow | 63
Slide 88: During reading, if the DFSInputStream encounters an error while communicating with a datanode, then it will try the next closest one for that block. It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks. The DFSInputStream also verifies checksums for the data transferred to it from the datanode. If a corrupted block is found, it is reported to the namenode before the DFSInput Stream attempts to read a replica of the block from another datanode. One important aspect of this design is that the client contacts datanodes directly to retrieve data and is guided by the namenode to the best datanode for each block. This design allows HDFS to scale to a large number of concurrent clients, since the data traffic is spread across all the datanodes in the cluster. The namenode meanwhile merely has to service block location requests (which it stores in memory, making them very efficient) and does not, for example, serve data, which would quickly become a bottleneck as the number of clients grew. Network Topology and Hadoop What does it mean for two nodes in a local network to be “close” to each other? In the context of high-volume data processing, the limiting factor is the rate at which we can transfer data between nodes—bandwidth is a scarce commodity. The idea is to use the bandwidth between two nodes as a measure of distance. Rather than measuring bandwidth between nodes, which can be difficult to do in practice (it requires a quiet cluster, and the number of pairs of nodes in a cluster grows as the square of the number of nodes), Hadoop takes a simple approach in which the network is represented as a tree and the distance between two nodes is the sum of their distances to their closest common ancestor. Levels in the tree are not predefined, but it is common to have levels that correspond to the data center, the rack, and the node that a process is running on. The idea is that the bandwidth available for each of the following scenarios becomes progressively less: • Processes on the same node • Different nodes on the same rack • Nodes on different racks in the same data center • Nodes in different data centers† For example, imagine a node n1 on rack r1 in data center d1. This can be represented as /d1/r1/n1. Using this notation, here are the distances for the four scenarios: • distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node) • distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack) • distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center) • distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers) † At the time of this writing, Hadoop is not suited for running across data centers. 64 | Chapter 3: The Hadoop Distributed Filesystem
Slide 89: This is illustrated schematically in Figure 3-2. (Mathematically inclined readers will notice that this is an example of a distance metric.) Finally, it is important to realize that Hadoop cannot divine your network topology for you. It needs some help; we’ll cover how to configure topology in “Network Topology” on page 261. By default though, it assumes that the network is flat—a single-level hierarchy—or in other words, that all nodes are on a single rack in a single data center. For small clusters, this may actually be the case, and no further configuration is required. Figure 3-2. Network distance in Hadoop Anatomy of a File Write Next we’ll look at how files are written to HDFS. Although quite detailed, it is instructive to understand the data flow since it clarifies HDFS’s coherency model. The case we’re going to consider is the case of creating a new file, writing data to it, then closing the file. See Figure 3-3. The client creates the file by calling create() on DistributedFileSystem (step 1 in Figure 3-3). DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it (step 2). The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException. The DistributedFileSystem returns an FSDataOutputStream for the client Data Flow | 65
Slide 90: to start writing data to. Just as in the read case, FSDataOutputStream wraps a DFSOutput Stream, which handles communication with the datanodes and namenode. As the client writes data (step 3), DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue. The data queue is consumed by the Data Streamer, whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas. The list of datanodes forms a pipeline—we’ll assume the replication level is three, so there are three nodes in the pipeline. The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline. Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipeline (step 4). Figure 3-3. A client writing data to HDFS DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline (step 5). If a datanode fails while data is being written to it, then the following actions are taken, which are transparent to the client writing the data. First the pipeline is closed, and any packets in the ack queue are added to the front of the data queue so that datanodes that are downstream from the failed node will not miss any packets. The current block on the good datanodes is given a new identity, which is communicated to the namenode, so that the partial block on the failed datanode will be deleted if the failed 66 | Chapter 3: The Hadoop Distributed Filesystem
Slide 91: datanode recovers later on. The failed datanode is removed from the pipeline and the remainder of the block’s data is written to the two good datanodes in the pipeline. The namenode notices that the block is under-replicated, and it arranges for a further replica to be created on another node. Subsequent blocks are then treated as normal. It’s possible, but unlikely, that multiple datanodes fail while a block is being written. As long as dfs.replication.min replicas (default one) are written, the write will succeed, and the block will be asynchronously replicated across the cluster until its target replication factor is reached (dfs.replication, which defaults to three). When the client has finished writing data, it calls close() on the stream (step 6). This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete (step 7). The namenode already knows which blocks the file is made up of (via Data Streamer asking for block allocations), so it only has to wait for blocks to be minimally replicated before returning successfully. Replica Placement How does the namenode choose which datanodes to store replicas on? There’s a tradeoff between reliability and write bandwidth and read bandwidth here. For example, placing all replicas on a single node incurs the lowest write bandwidth penalty since the replication pipeline runs on a single node, but this offers no real redundancy (if the node fails, the data for that block is lost). Also, the read bandwidth is high for off-rack reads. At the other extreme, placing replicas in different data centers may maximize redundancy, but at the cost of bandwidth. Even in the same data center (which is what all Hadoop clusters to date have run in), there are a variety of placement strategies. Indeed, Hadoop changed its placement strategy in release 0.17.0 to one that helps keep a fairly even distribution of blocks across the cluster. (See “balancer” on page 304 for details on keeping a cluster balanced.) And from 0.21.0, block placement policies are pluggable. Hadoop’s default strategy is to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second, but on a different node chosen at random. Further replicas are placed on random nodes on the cluster, although the system tries to avoid placing too many replicas on the same rack. Once the replica locations have been chosen, a pipeline is built, taking network topology into account. For a replication factor of 3, the pipeline might look like Figure 3-4. Overall, this strategy gives a good balance among reliability (blocks are stored on two racks), write bandwidth (writes only have to traverse a single network switch), read performance (there’s a choice of two racks to read from), and block distribution across the cluster (clients only write a single block on the local rack). Data Flow | 67
Slide 92: Figure 3-4. A typical replica pipeline Coherency Model A coherency model for a filesystem describes the data visibility of reads and writes for a file. HDFS trades off some POSIX requirements for performance, so some operations may behave differently than you expect them to. After creating a file, it is visible in the filesystem namespace, as expected: Path p = new Path("p"); fs.create(p); assertThat(fs.exists(p), is(true)); However, any content written to the file is not guaranteed to be visible, even if the stream is flushed. So the file appears to have a length of zero: Path p = new Path("p"); OutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); assertThat(fs.getFileStatus(p).getLen(), is(0L)); Once more than a block’s worth of data has been written, the first block will be visible to new readers. This is true of subsequent blocks, too: it is always the current block being written that is not visible to other readers. 68 | Chapter 3: The Hadoop Distributed Filesystem
Slide 93: HDFS provides a method for forcing all buffers to be synchronized to the datanodes via the sync() method on FSDataOutputStream. After a successful return from sync(), HDFS guarantees that the data written up to that point in the file is persisted and visible to all new readers:‡ Path p = new Path("p"); FSDataOutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); out.sync(); assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length()))); This behavior is similar to the fsync system call in POSIX that commits buffered data for a file descriptor. For example, using the standard Java API to write a local file, we are guaranteed to see the content after flushing the stream and synchronizing: FileOutputStream out = new FileOutputStream(localFile); out.write("content".getBytes("UTF-8")); out.flush(); // flush to operating system out.getFD().sync(); // sync to disk assertThat(localFile.length(), is(((long) "content".length()))); Closing a file in HDFS performs an implicit sync(), too: Path p = new Path("p"); OutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.close(); assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length()))); Consequences for application design This coherency model has implications for the way you design applications. With no calls to sync(), you should be prepared to lose up to a block of data in the event of client or system failure. For many applications, this is unacceptable, so you should call sync() at suitable points, such as after writing a certain number of records or number of bytes. Though the sync() operation is designed to not unduly tax HDFS, it does have some overhead, so there is a trade-off between data robustness and throughput. What is an acceptable trade-off is application-dependent, and suitable values can be selected after measuring your application’s performance with different sync() frequencies. ‡ Releases of Hadoop up to and including 0.20 do not have a working implementation of sync(); however, this has been remedied from 0.21.0 onward. Also, from that version, sync() is deprecated in favor of hflush(), which only guarantees that new readers will see all data written to that point, and hsync(), which makes a stronger guarantee that the operating system has flushed the data to disk (like POSIX fsync), although data may still be in the disk cache. Data Flow | 69
Slide 94: Parallel Copying with distcp The HDFS access patterns that we have seen so far focus on single-threaded access. It’s possible to act on a collection of files, by specifying file globs, for example, but for efficient, parallel processing of these files you would have to write a program yourself. Hadoop comes with a useful program called distcp for copying large amounts of data to and from Hadoop filesystems in parallel. The canonical use case for distcp is for transferring data between two HDFS clusters. If the clusters are running identical versions of Hadoop, the hdfs scheme is appropriate: % hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar This will copy the /foo directory (and its contents) from the first cluster to the /bar directory on the second cluster, so the second cluster ends up with the directory structure /bar/foo. If /bar doesn’t exist, it will be created first. You can specify multiple source paths, and all will be copied to the destination. Source paths must be absolute. By default, distcp will skip files that already exist in the destination, but they can be overwritten by supplying the -overwrite option. You can also update only files that have changed using the -update option. Using either (or both) of -overwrite or -update changes how the source and destination paths are interpreted. This is best shown with an example. If we changed a file in the /foo subtree on the first cluster from the previous example, then we could synchronize the change with the second cluster by running: % hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo The extra trailing /foo subdirectory is needed on the destination, as now the contents of the source directory are copied to the contents of the destination directory. (If you are familiar with rsync, you can think of the -overwrite or -update options as adding an implicit trailing slash to the source.) If you are unsure of the effect of a distcp operation, it is a good idea to try it out on a small test directory tree first. There are more options to control the behavior of distcp, including ones to preserve file attributes, ignore failures, and limit the number of files or total data copied. Run it with no options to see the usage instructions. distcp is implemented as a MapReduce job where the work of copying is done by the maps that run in parallel across the cluster. There are no reducers. Each file is copied by a single map, and distcp tries to give each map approximately the same amount of data, by bucketing files into roughly equal allocations. 70 | Chapter 3: The Hadoop Distributed Filesystem
Slide 95: The number of maps is decided as follows. Since it’s a good idea to get each map to copy a reasonable amount of data to minimize overheads in task setup, each map copies at least 256 MB (unless the total size of the input is less, in which case one map handles it all). For example, 1 GB of files will be given four map tasks. When the data size is very large, it becomes necessary to limit the number of maps in order to limit bandwidth and cluster utilization. By default, the maximum number of maps is 20 per (tasktracker) cluster node. For example, copying 1,000 GB of files to a 100-node cluster will allocate 2,000 maps (20 per node), so each will copy 512 MB on average. This can be reduced by specifying the -m argument to distcp. For example, -m 1000 would allocate 1,000 maps, each copying 1 GB on average. If you try to use distcp between two HDFS clusters that are running different versions, the copy will fail if you use the hdfs protocol, since the RPC systems are incompatible. To remedy this, you can use the read-only HTTP-based HFTP filesystem to read from the source. The job must run on the destination cluster so that the HDFS RPC versions are compatible. To repeat the previous example using HFTP: % hadoop distcp hftp://namenode1:50070/foo hdfs://namenode2/bar Note that you need to specify the namenode’s web port in the source URI. This is determined by the dfs.http.address property, which defaults to 50070. Keeping an HDFS Cluster Balanced When copying data into HDFS, it’s important to consider cluster balance. HDFS works best when the file blocks are evenly spread across the cluster, so you want to ensure that distcp doesn’t disrupt this. Going back to the 1,000 GB example, by specifying -m 1 a single map would do the copy, which—apart from being slow and not using the cluster resources efficiently—would mean that the first replica of each block would reside on the node running the map (until the disk filled up). The second and third replicas would be spread across the cluster, but this one node would be unbalanced. By having more maps than nodes in the cluster, this problem is avoided—for this reason, it’s best to start by running distcp with the default of 20 maps per node. However, it’s not always possible to prevent a cluster from becoming unbalanced. Perhaps you want to limit the number of maps so that some of the nodes can be used by other jobs. In this case, you can use the balancer tool (see “balancer” on page 304) to subsequently even out the block distribution across the cluster. Hadoop Archives HDFS stores small files inefficiently, since each file is stored in a block, and block metadata is held in memory by the namenode. Thus, a large number of small files can eat up a lot of memory on the namenode. (Note, however, that small files do not take up any more disk space than is required to store the raw contents of the file. For Hadoop Archives | 71
Slide 96: example, a 1 MB file stored with a block size of 128 MB uses 1 MB of disk space, not 128 MB.) Hadoop Archives, or HAR files, are a file archiving facility that packs files into HDFS blocks more efficiently, thereby reducing namenode memory usage while still allowing transparent access to files. In particular, Hadoop Archives can be used as input to MapReduce. Using Hadoop Archives A Hadoop Archive is created from a collection of files using the archive tool. The tool runs a MapReduce job to process the input files in parallel, so to run it, you need a MapReduce cluster running to use it. Here are some files in HDFS that we would like to archive: % hadoop fs -lsr /my/files -rw-r--r-1 tom supergroup drwxr-xr-x - tom supergroup -rw-r--r-1 tom supergroup 1 2009-04-09 19:13 /my/files/a 0 2009-04-09 19:13 /my/files/dir 1 2009-04-09 19:13 /my/files/dir/b Now we can run the archive command: % hadoop archive -archiveName files.har /my/files /my The first option is the name of the archive, here files.har. HAR files always have a .har extension, which is mandatory for reasons we shall see later. Next comes the files to put in the archive. Here we are archiving only one source tree, the files in /my/files in HDFS, but the tool accepts multiple source trees. The final argument is the output directory for the HAR file. Let’s see what the archive has created: % hadoop fs -ls /my Found 2 items drwxr-xr-x - tom supergroup drwxr-xr-x - tom supergroup % hadoop fs -ls /my/files.har Found 3 items -rw-r--r-- 10 tom supergroup -rw-r--r-- 10 tom supergroup -rw-r--r-1 tom supergroup 0 2009-04-09 19:13 /my/files 0 2009-04-09 19:13 /my/files.har 165 2009-04-09 19:13 /my/files.har/_index 23 2009-04-09 19:13 /my/files.har/_masterindex 2 2009-04-09 19:13 /my/files.har/part-0 The directory listing shows what a HAR file is made of: two index files and a collection of part files—just one in this example. The part files contain the contents of a number of the original files concatenated together, and the indexes make it possible to look up the part file that an archived file is contained in, and its offset and length. All these details are hidden from the application, however, which uses the har URI scheme to interact with HAR files, using a HAR filesystem that is layered on top of the underlying filesystem (HDFS in this case). The following command recursively lists the files in the archive: 72 | Chapter 3: The Hadoop Distributed Filesystem
Slide 97: % hadoop fs -lsr har:///my/files.har drw-r--r-- tom supergroup drw-r--r-- tom supergroup -rw-r--r-- 10 tom supergroup drw-r--r-- tom supergroup -rw-r--r-- 10 tom supergroup 0 0 1 0 1 2009-04-09 2009-04-09 2009-04-09 2009-04-09 2009-04-09 19:13 19:13 19:13 19:13 19:13 /my/files.har/my /my/files.har/my/files /my/files.har/my/files/a /my/files.har/my/files/dir /my/files.har/my/files/dir/b This is quite straightforward if the filesystem that the HAR file is on is the default filesystem. On the other hand, if you want to refer to a HAR file on a different filesystem, then you need to use a different form of the path URI to normal. These two commands have the same effect, for example: % hadoop fs -lsr har:///my/files.har/my/files/dir % hadoop fs -lsr har://hdfs-localhost:8020/my/files.har/my/files/dir Notice in the second form that the scheme is still har to signify a HAR filesystem, but the authority is hdfs to specify the underlying filesystem’s scheme, followed by a dash and the HDFS host (localhost) and port (8020). We can now see why HAR files have to have a .har extension. The HAR filesystem translates the har URI into a URI for the underlying filesystem, by looking at the authority and path up to and including the component with the .har extension. In this case, it is hdfs://localhost:8020/my/files .har. The remaining part of the path is the path of the file in the archive: /my/files/dir. To delete a HAR file, you need to use the recursive form of delete, since from the underlying filesystem’s point of view the HAR file is a directory: % hadoop fs -rmr /my/files.har Limitations There are a few limitations to be aware of with HAR files. Creating an archive creates a copy of the original files, so you need as much disk space as the files you are archiving to create the archive (although you can delete the originals once you have created the archive). There is currently no support for archive compression, although the files that go into the archive can be compressed (HAR files are like tar files in this respect). Archives are immutable once they have been created. To add or remove files, you must re-create the archive. In practice, this is not a problem for files that don’t change after being written, since they can be archived in batches on a regular basis, such as daily or weekly. As noted earlier, HAR files can be used as input to MapReduce. However, there is no archive-aware InputFormat that can pack multiple files into a single MapReduce split, so processing lots of small files, even in a HAR file, can still be inefficient. “Small files and CombineFileInputFormat” on page 203 discusses another approach to this problem. Hadoop Archives | 73
Slide 99: CHAPTER 4 Hadoop I/O Hadoop comes with a set of primitives for data I/O. Some of these are techniques that are more general than Hadoop, such as data integrity and compression, but deserve special consideration when dealing with multiterabyte datasets. Others are Hadoop tools or APIs that form the building blocks for developing distributed systems, such as serialization frameworks and on-disk data structures. Data Integrity Users of Hadoop rightly expect that no data will be lost or corrupted during storage or processing. However, since every I/O operation on the disk or network carries with it a small chance of introducing errors into the data that it is reading or writing, when the volumes of data flowing through the system are as large as the ones Hadoop is capable of handling, the chance of data corruption occurring is high. The usual way of detecting corrupted data is by computing a checksum for the data when it first enters the system, and again whenever it is transmitted across a channel that is unreliable and hence capable of corrupting the data. The data is deemed to be corrupt if the newly generated checksum doesn’t exactly match the original. This technique doesn’t offer any way to fix the data—merely error detection. (And this is a reason for not using low-end hardware; in particular, be sure to use ECC memory.) Note that it is possible that it’s the checksum that is corrupt, not the data, but this is very unlikely, since the checksum is much smaller than the data. A commonly used error-detecting code is CRC-32 (cyclic redundancy check), which computes a 32-bit integer checksum for input of any size. Data Integrity in HDFS HDFS transparently checksums all data written to it and by default verifies checksums when reading data. A separate checksum is created for every io.bytes.per.checksum 75
Slide 100: bytes of data. The default is 512 bytes, and since a CRC-32 checksum is 4 bytes long, the storage overhead is less than 1%. Datanodes are responsible for verifying the data they receive before storing the data and its checksum. This applies to data that they receive from clients and from other datanodes during replication. A client writing data sends it to a pipeline of datanodes (as explained in Chapter 3), and the last datanode in the pipeline verifies the checksum. If it detects an error, the client receives a ChecksumException, a subclass of IOExcep tion, which it should handle in an application-specific manner, by retrying the operation, for example. When clients read data from datanodes, they verify checksums as well, comparing them with the ones stored at the datanode. Each datanode keeps a persistent log of checksum verifications, so it knows the last time each of its blocks was verified. When a client successfully verifies a block, it tells the datanode, which updates its log. Keeping statistics such as these is valuable in detecting bad disks. Aside from block verification on client reads, each datanode runs a DataBlockScanner in a background thread that periodically verifies all the blocks stored on the datanode. This is to guard against corruption due to “bit rot” in the physical storage media. See “Datanode block scanner” on page 303 for details on how to access the scanner reports. Since HDFS stores replicas of blocks, it can “heal” corrupted blocks by copying one of the good replicas to produce a new, uncorrupt replica. The way this works is that if a client detects an error when reading a block, it reports the bad block and the datanode it was trying to read from to the namenode before throwing a ChecksumException. The namenode marks the block replica as corrupt, so it doesn’t direct clients to it, or try to copy this replica to another datanode. It then schedules a copy of the block to be replicated on another datanode, so its replication factor is back at the expected level. Once this has happened, the corrupt replica is deleted. It is possible to disable verification of checksums by passing false to the setVerify Checksum() method on FileSystem, before using the open() method to read a file. The same effect is possible from the shell by using the -ignoreCrc option with the -get or the equivalent -copyToLocal command. This feature is useful if you have a corrupt file that you want to inspect so you can decide what to do with it. For example, you might want to see whether it can be salvaged before you delete it. LocalFileSystem The Hadoop LocalFileSystem performs client-side checksumming. This means that when you write a file called filename, the filesystem client transparently creates a hidden file, .filename.crc, in the same directory containing the checksums for each chunk of the file. Like HDFS, the chunk size is controlled by the io.bytes.per.checksum property, which defaults to 512 bytes. The chunk size is stored as metadata in the .crc file, so the 76 | Chapter 4: Hadoop I/O
Slide 101: file can be read back correctly even if the setting for the chunk size has changed. Checksums are verified when the file is read, and if an error is detected, LocalFileSystem throws a ChecksumException. Checksums are fairly cheap to compute (in Java, they are implemented in native code), typically adding a few percent overhead to the time to read or write a file. For most applications, this is an acceptable price to pay for data integrity. It is, however, possible to disable checksums: typically when the underlying filesystem supports checksums natively. This is accomplished by using RawLocalFileSystem in place of Local FileSystem. To do this globally in an application, it suffices to remap the implementation for file URIs by setting the property fs.file.impl to the value org.apache.hadoop.fs.RawLocalFileSystem. Alternatively, you can directly create a Raw LocalFileSystem instance, which may be useful if you want to disable checksum verification for only some reads; for example: Configuration conf = ... FileSystem fs = new RawLocalFileSystem(); fs.initialize(null, conf); ChecksumFileSystem LocalFileSystem uses ChecksumFileSystem to do its work, and this class makes it easy to add checksumming to other (nonchecksummed) filesystems, as Checksum FileSystem is just a wrapper around FileSystem. The general idiom is as follows: FileSystem rawFs = ... FileSystem checksummedFs = new ChecksumFileSystem(rawFs); The underlying filesystem is called the raw filesystem, and may be retrieved using the getRawFileSystem() method on ChecksumFileSystem. ChecksumFileSystem has a few more useful methods for working with checksums, such as getChecksumFile() for getting the path of a checksum file for any file. Check the documentation for the others. If an error is detected by ChecksumFileSystem when reading a file, it will call its reportChecksumFailure() method. The default implementation does nothing, but LocalFileSystem moves the offending file and its checksum to a side directory on the same device called bad_files. Administrators should periodically check for these bad files and take action on them. Compression File compression brings two major benefits: it reduces the space needed to store files, and it speeds up data transfer across the network, or to or from disk. When dealing with large volumes of data, both of these savings can be significant, so it pays to carefully consider how to use compression in Hadoop. Compression | 77
Slide 102: There are many different compression formats, tools and algorithms, each with different characteristics. Table 4-1 lists some of the more common ones that can be used with Hadoop.* Table 4-1. A summary of compression formats Compression format DEFLATEa gzip bzip2 LZO a Tool N/A gzip bzip2 lzop Algorithm DEFLATE DEFLATE bzip2 LZO Filename extension .deflate .gz .bz2 .lzo Multiple files No No No No Splittable No No Yes No DEFLATE is a compression algorithm whose standard implementation is zlib. There is no commonly available command-line tool for producing files in DEFLATE format, as gzip is normally used. (Note that the gzip file format is DEFLATE with extra headers and a footer.) The .deflate filename extension is a Hadoop convention. All compression algorithms exhibit a space/time trade-off: faster compression and decompression speeds usually come at the expense of smaller space savings. All of the tools listed in Table 4-1 give some control over this trade-off at compression time by offering nine different options: –1 means optimize for speed and -9 means optimize for space. For example, the following command creates a compressed file file.gz using the fastest compression method: gzip -1 file The different tools have very different compression characteristics. Gzip is a generalpurpose compressor, and sits in the middle of the space/time trade-off. Bzip2 compresses more effectively than gzip, but is slower. Bzip2’s decompression speed is faster than its compression speed, but it is still slower than the other formats. LZO, on the other hand, optimizes for speed: it is faster than gzip (or any other compression or decompression tool†), but compresses slightly less effectively. The “Splittable” column in Table 4-1 indicates whether the compression format supports splitting; that is, whether you can seek to any point in the stream and start reading from some point further on. Splittable compression formats are especially suitable for MapReduce; see “Compression and Input Splits” on page 83 for further discussion. Codecs A codec is the implementation of a compression-decompression algorithm. In Hadoop, a codec is represented by an implementation of the CompressionCodec interface. So, for * At the time of this writing, Hadoop does not support ZIP compression. See https://issues.apache.org/jira/ browse/MAPREDUCE-210. † Jeff Gilchrist’s Archive Comparison Test at http://compression.ca/act/act-summary.html contains benchmarks for compression and decompression speed, and compression ratio for a wide range of tools. 78 | Chapter 4: Hadoop I/O
Slide 103: example, GzipCodec encapsulates the compression and decompression algorithm for gzip. Table 4-2 lists the codecs that are available for Hadoop. Table 4-2. Hadoop compression codecs Compression format DEFLATE gzip bzip2 LZO Hadoop CompressionCodec org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.BZip2Codec com.hadoop.compression.lzo.LzopCodec The LZO libraries are GPL-licensed and may not be included in Apache distributions, so for this reason the Hadoop codecs must be downloaded separately from http://code .google.com/p/hadoop-gpl-compression/ (or http://github.com/kevinweil/hadoop-lzo, which includes bugfixes and more tools). The LzopCodec is compatible with the lzop tool, which is essentially the LZO format with extra headers, and is the one you normally want. There is also a LzoCodec for the pure LZO format, which uses the .lzo_deflate filename extension (by analogy with DEFLATE, which is gzip without the headers). Compressing and decompressing streams with CompressionCodec CompressionCodec has two methods that allow you to easily compress or decompress data. To compress data being written to an output stream, use the createOutput Stream(OutputStream out) method to create a CompressionOutputStream to which you write your uncompressed data to have it written in compressed form to the underlying stream. Conversely, to decompress data being read from an input stream, call createInputStream(InputStream in) to obtain a CompressionInputStream, which allows you to read uncompressed data from the underlying stream. CompressionOutputStream and CompressionInputStream are similar to java.util.zip.DeflaterOutputStream and java.util.zip.DeflaterInputStream, except that both of the former provide the ability to reset their underlying compressor or decompressor, which is important for applications that compress sections of the data stream as separate blocks, such as SequenceFile, described in “SequenceFile” on page 116. Example 4-1 illustrates how to use the API to compress data read from standard input and write it to standard output. Example 4-1. A program to compress data read from standard input and write it to standard output public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Compression | 79
Slide 104: Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } } The application expects the fully qualified name of the CompressionCodec implementation as the first command-line argument. We use ReflectionUtils to construct a new instance of the codec, then obtain a compression wrapper around System.out. Then we call the utility method copyBytes() on IOUtils to copy the input to the output, which is compressed by the CompressionOutputStream. Finally, we call finish() on CompressionOutputStream, which tells the compressor to finish writing to the compressed stream, but doesn’t close the stream. We can try it out with the following command line, which compresses the string “Text” using the StreamCompressor program with the GzipCodec, then decompresses it from standard input using gunzip: % echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \ | gunzip Text Inferring CompressionCodecs using CompressionCodecFactory If you are reading a compressed file, you can normally infer the codec to use by looking at its filename extension. A file ending in .gz can be read with GzipCodec, and so on. The extension for each compression format is listed in Table 4-1. CompressionCodecFactory provides a way of mapping a filename extension to a CompressionCodec using its getCodec() method, which takes a Path object for the file in question. Example 4-2 shows an application that uses this feature to decompress files. Example 4-2. A program to decompress a compressed file using a codec inferred from the file’s extension public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } 80 | Chapter 4: Hadoop I/O
Slide 105: String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } } Once the codec has been found, it is used to strip off the file suffix to form the output filename (via the removeSuffix() static method of CompressionCodecFactory). In this way, a file named file.gz is decompressed to file by invoking the program as follows: % hadoop FileDecompressor file.gz CompressionCodecFactory finds codecs from a list defined by the io.compression.codecs configuration property. By default, this lists all the codecs pro- vided by Hadoop (see Table 4-3), so you would need to alter it only if you have a custom codec that you wish to register (such as the externally hosted LZO codecs). Each codec knows its default filename extension, thus permitting CompressionCodecFactory to search through the registered codecs to find a match for a given extension (if any). Table 4-3. Compression codec properties Property name io.compression.codecs Type comma-separated Class names Default value org.apache.hadoop.io. compress.DefaultCodec, org.apache.hadoop.io. compress.GzipCodec, org.apache.hadoop.io. compress.Bzip2Codec Description A list of the CompressionCodec classes for compression/ decompression. Native libraries For performance, it is preferable to use a native library for compression and decompression. For example, in one test, using the native gzip libraries reduced decompression times by up to 50% and compression times by around 10% (compared to the built-in Java implementation). Table 4-4 shows the availability of Java and native implementations for each compression format. Not all formats have native implementations (bzip2, for example), whereas others are only available as a native implementation (LZO, for example). Compression | 81
Slide 106: Table 4-4. Compression library implementations Compression format DEFLATE gzip bzip2 LZO Java implementation Yes Yes Yes No Native implementation Yes Yes No Yes Hadoop comes with prebuilt native compression libraries for 32- and 64-bit Linux, which you can find in the lib/native directory. For other platforms, you will need to compile the libraries yourself, following the instructions on the Hadoop wiki at http:// wiki.apache.org/hadoop/NativeHadoop. The native libraries are picked up using the Java system property java.library.path. The hadoop script in the bin directory sets this property for you, but if you don’t use this script, you will need to set the property in your application. By default, Hadoop looks for native libraries for the platform it is running on, and loads them automatically if they are found. This means you don’t have to change any configuration settings to use the native libraries. In some circumstances, however, you may wish to disable use of native libraries, such as when you are debugging a compressionrelated problem. You can achieve this by setting the property hadoop.native.lib to false, which ensures that the built-in Java equivalents will be used (if they are available). CodecPool. If you are using a native library and you are doing a lot of compression or decompression in your application, consider using CodecPool, which allows you to reuse compressors and decompressors, thereby amortizing the cost of creating these objects. The code in Example 4-3 shows the API, although in this program, which only creates a single Compressor, there is really no need to use a pool. Example 4-3. A program to compress data read from standard input and write it to standard output using a pooled compressor public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); 82 | Chapter 4: Hadoop I/O
Slide 107: } } } finally { CodecPool.returnCompressor(compressor); } We retrieve a Compressor instance from the pool for a given CompressionCodec, which we use in the codec’s overloaded createOutputStream() method. By using a finally block, we ensure that the compressor is returned to the pool even if there is an IOException while copying the bytes between the streams. Compression and Input Splits When considering how to compress data that will be processed by MapReduce, it is important to understand whether the compression format supports splitting. Consider an uncompressed file stored in HDFS whose size is 1 GB. With an HDFS block size of 64 MB, the file will be stored as 16 blocks, and a MapReduce job using this file as input will create 16 input splits, each processed independently as input to a separate map task. Imagine now the file is a gzip-compressed file whose compressed size is 1 GB. As before, HDFS will store the file as 16 blocks. However, creating a split for each block won’t work since it is impossible to start reading at an arbitrary point in the gzip stream, and therefore impossible for a map task to read its split independently of the others. The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of compressed blocks. The problem is that the start of each block is not distinguished in any way that would allow a reader positioned at an arbitrary point in the stream to advance to the beginning of the next block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting. In this case, MapReduce will do the right thing and not try to split the gzipped file, since it knows that the input is gzip-compressed (by looking at the filename extension) and that gzip does not support splitting. This will work, but at the expense of locality: a single map will process the 16 HDFS blocks, most of which will not be local to the map. Also, with fewer maps, the job is less granular, and so may take longer to run. If the file in our hypothetical example were an LZO file, we would have the same problem since the underlying compression format does not provide a way for a reader to synchronize itself with the stream.‡ A bzip2 file, however, does provide a synchronization marker between blocks (a 48-bit approximation of pi), so it does support splitting. (Table 4-1 lists whether each compression format supports splitting.) ‡ It is possible to preprocess gzip and LZO files to build an index of split points, effectively making them splittable. See https://issues.apache.org/jira/browse/MAPREDUCE-491 for gzip. For LZO, there is an indexer tool available with the Hadoop LZO libraries, which you can obtain from the site listed in “Codecs” on page 78. Compression | 83
Slide 108: Which Compression Format Should I Use? Which compression format you should use depends on your application. Do you want to maximize the speed of your application or are you more concerned about keeping storage costs down? In general, you should try different strategies for your application, and benchmark them with representative datasets to find the best approach. For large, unbounded files, like logfiles, the options are: • Store the files uncompressed. • Use a compression format that supports splitting, like bzip2. • Split the file into chunks in the application and compress each chunk separately using any supported compression format (it doesn’t matter whether it is splittable). In this case, you should choose the chunk size so that the compressed chunks are approximately the size of an HDFS block. • Use Sequence File, which supports compression and splitting. See “SequenceFile” on page 116. • Use an Avro data file, which supports compression and splitting, just like Sequence File, but has the added advantage of being readable and writable from many languages, not just Java. See “Avro data files” on page 109. For large files, you should not use a compression format that does not support splitting on the whole file, since you lose locality and make MapReduce applications very inefficient. For archival purposes, consider the Hadoop archive format (see “Hadoop Archives” on page 71), although it does not support compression. Using Compression in MapReduce As described in “Inferring CompressionCodecs using CompressionCodecFactory” on page 80, if your input files are compressed, they will be automatically decompressed as they are read by MapReduce, using the filename extension to determine the codec to use. To compress the output of a MapReduce job, in the job configuration, set the mapred.output.compress property to true and the mapred.output.compression.codec property to the classname of the compression codec you want to use, as shown in Example 4-4. Example 4-4. Application to run the maximum temperature job producing compressed output public class MaxTemperatureWithCompression { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>"); 84 | Chapter 4: Hadoop I/O
Slide 109: } System.exit(-1); JobConf conf = new JobConf(MaxTemperatureWithCompression.class); conf.setJobName("Max temperature with output compression"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); conf.setMapperClass(MaxTemperatureMapper.class); conf.setCombinerClass(MaxTemperatureReducer.class); conf.setReducerClass(MaxTemperatureReducer.class); } JobClient.runJob(conf); } We run the program over compressed input (which doesn’t have to use the same compression format as the output, although it does in this example) as follows: % hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output Each part of the final output is compressed; in this case, there is a single part: % gunzip -c output/part-00000.gz 1949 111 1950 22 If you are emitting sequence files for your output, then you can set the mapred.out put.compression.type property to control the type of compression to use. The default is RECORD, which compresses individual records. Changing this to BLOCK, which compresses groups of records, is recommended since it compresses better (see “The SequenceFile format” on page 122). Compressing map output Even if your MapReduce application reads and writes uncompressed data, it may benefit from compressing the intermediate output of the map phase. Since the map output is written to disk and transferred across the network to the reducer nodes, by using a fast compressor such as LZO, you can get performance gains simply because the volume of data to transfer is reduced. The configuration properties to enable compression for map outputs and to set the compression format are shown in Table 4-5. Compression | 85
Slide 110: Table 4-5. Map output compression properties Property name mapred.compress.map. output mapred.map.output. compression.codec Type boolean Class Default value false org.apache.hadoop.io. compress.DefaultCodec Description Compress map outputs. The compression codec to use for map outputs. Here are the lines to add to enable gzip map output compression in your job: conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(GzipCodec.class); Serialization Serialization is the process of turning structured objects into a byte stream for transmission over a network or for writing to persistent storage. Deserialization is the reverse process of turning a byte stream back into a series of structured objects. Serialization appears in two quite distinct areas of distributed data processing: for interprocess communication and for persistent storage. In Hadoop, interprocess communication between nodes in the system is implemented using remote procedure calls (RPCs). The RPC protocol uses serialization to render the message into a binary stream to be sent to the remote node, which then deserializes the binary stream into the original message. In general, it is desirable that an RPC serialization format is: Compact A compact format makes the best use of network bandwidth, which is the most scarce resource in a data center. Fast Interprocess communication forms the backbone for a distributed system, so it is essential that there is as little performance overhead as possible for the serialization and deserialization process. Extensible Protocols change over time to meet new requirements, so it should be straightforward to evolve the protocol in a controlled manner for clients and servers. For example, it should be possible to add a new argument to a method call, and have the new servers accept messages in the old format (without the new argument) from old clients. Interoperable For some systems, it is desirable to be able to support clients that are written in different languages to the server, so the format needs to be designed to make this possible. 86 | Chapter 4: Hadoop I/O
Slide 111: On the face of it, the data format chosen for persistent storage would have different requirements from a serialization framework. After all, the lifespan of an RPC is less than a second, whereas persistent data may be read years after it was written. As it turns out, the four desirable properties of an RPC’s serialization format are also crucial for a persistent storage format. We want the storage format to be compact (to make efficient use of storage space), fast (so the overhead in reading or writing terabytes of data is minimal), extensible (so we can transparently read data written in an older format), and interoperable (so we can read or write persistent data using different languages). Hadoop uses its own serialization format, Writables, which is certainly compact and fast, but not so easy to extend or use from languages other than Java. Since Writables are central to Hadoop (most MapReduce programs use them for their key and value types), we look at them in some depth in the next three sections, before looking at serialization frameworks in general, and then Avro (a serialization system that was designed to overcome some of the limitations of Writables) in more detail. The Writable Interface The Writable interface defines two methods: one for writing its state to a DataOutput binary stream, and one for reading its state from a DataInput binary stream: package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; } Let’s look at a particular Writable to see what we can do with it. We will use IntWritable, a wrapper for a Java int. We can create one and set its value using the set() method: IntWritable writable = new IntWritable(); writable.set(163); Equivalently, we can use the constructor that takes the integer value: IntWritable writable = new IntWritable(163); To examine the serialized form of the IntWritable, we write a small helper method that wraps a java.io.ByteArrayOutputStream in a java.io.DataOutputStream (an implementation of java.io.DataOutput) to capture the bytes in the serialized stream: public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); Serialization | 87
Slide 112: } return out.toByteArray(); An integer is written using four bytes (as we see using JUnit 4 assertions): byte[] bytes = serialize(writable); assertThat(bytes.length, is(4)); The bytes are written in big-endian order (so the most significant byte is written to the stream first, this is dictated by the java.io.DataOutput interface), and we can see their hexadecimal representation by using a method on Hadoop’s StringUtils: assertThat(StringUtils.byteToHexString(bytes), is("000000a3")); Let’s try deserialization. Again, we create a helper method to read a Writable object from a byte array: public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; } We construct a new, value-less, IntWritable, then call deserialize() to read from the output data that we just wrote. Then we check that its value, retrieved using the get() method, is the original value, 163: IntWritable newWritable = new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163)); WritableComparable and comparators IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interfaces: package org.apache.hadoop.io; public interface WritableComparable<T> extends Writable, Comparable<T> { } Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another. One optimization that Hadoop provides is the RawComparator extension of Java’s Comparator: 88 | Chapter 4: Hadoop I/O
Slide 113: package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } This interface permits implementors to compare records read from a stream without deserializing them into objects, thereby avoiding any overhead of object creation. For example, the comparator for IntWritables implements the raw compare() method by reading an integer from each of the byte arrays b1 and b2 and comparing them directly, from the given start positions (s1 and s2) and lengths (l1 and l2). WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes. It provides two main functions. First, it provides a default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method. Second, it acts as a factory for RawComparator instances (that Writable implementations have registered). For example, to obtain a comparator for IntWritable, we just use: RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class); The comparator can be used to compare two IntWritable objects: IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0)); or their serialized representations: byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0)); Writable Classes Hadoop comes with a large selection of Writable classes in the org.apache.hadoop.io package. They form the class hierarchy shown in Figure 4-1. Writable wrappers for Java primitives There are Writable wrappers for all the Java primitive types (see Table 4-6) except short and char (both of which can be stored in an IntWritable). All have a get() and a set() method for retrieving and storing the wrapped value. Serialization | 89
Slide 114: Figure 4-1. Writable class hierarchy Table 4-6. Writable wrapper classes for Java primitives Java primitive boolean byte int float long Writable implementation BooleanWritable ByteWritable IntWritable VIntWritable FloatWritable LongWritable VLongWritable Serialized size (bytes) 1 1 4 1–5 4 8 1–9 90 | Chapter 4: Hadoop I/O
Slide 115: Java primitive double Writable implementation DoubleWritable Serialized size (bytes) 8 When it comes to encoding integers, there is a choice between the fixed-length formats (IntWritable and LongWritable) and the variable-length formats (VIntWritable and VLongWritable). The variable-length formats use only a single byte to encode the value if it is small enough (between –112 and 127, inclusive); otherwise, they use the first byte to indicate whether the value is positive or negative, and how many bytes follow. For example, 163 requires two bytes: byte[] data = serialize(new VIntWritable(163)); assertThat(StringUtils.byteToHexString(data), is("8fa3")); How do you choose between a fixed-length and a variable-length encoding? Fixedlength encodings are good when the distribution of values is fairly uniform across the whole value space, such as a (well-designed) hash function. Most numeric variables tend to have nonuniform distributions, and on average the variable-length encoding will save space. Another advantage of variable-length encodings is that you can switch from VIntWritable to VLongWritable, since their encodings are actually the same. So by choosing a variable-length representation, you have room to grow without committing to an 8-byte long representation from the beginning. Text Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent of java.lang.String. Text is a replacement for the UTF8 class, which was deprecated because it didn’t support strings whose encoding was over 32,767 bytes, and because it used Java’s modified UTF-8. The Text class uses an int (with a variable-length encoding) to store the number of bytes in the string encoding, so the maximum value is 2 GB. Furthermore, Text uses standard UTF-8, which makes it potentially easier to interoperate with other tools that understand UTF-8. Indexing. Because of its emphasis on using standard UTF-8, there are some differences between Text and the Java String class. Indexing for the Text class is in terms of position in the encoded byte sequence, not the Unicode character in the string, or the Java char code unit (as it is for String). For ASCII strings, these three concepts of index position coincide. Here is an example to demonstrate the use of the charAt() method: Text t = new Text("hadoop"); assertThat(t.getLength(), is(6)); assertThat(t.getBytes().length, is(6)); assertThat(t.charAt(2), is((int) 'd')); assertThat("Out of bounds", t.charAt(100), is(-1)); Serialization | 91
Slide 116: Notice that charAt() returns an int representing a Unicode code point, unlike the String variant that returns a char. Text also has a find() method, which is analogous to String’s indexOf(): Text t = new Text("hadoop"); assertThat("Find a substring", t.find("do"), is(2)); assertThat("Finds first 'o'", t.find("o"), is(3)); assertThat("Finds 'o' from position 4 or later", t.find("o", 4), is(4)); assertThat("No match", t.find("pig"), is(-1)); Unicode. When we start using characters that are encoded with more than a single byte, the differences between Text and String become clear. Consider the Unicode characters shown in Table 4-7.§ Table 4-7. Unicode characters Unicode code point Name UTF-8 code units Java representation U+0041 LATIN CAPITAL LETTER A 41 \u0041 U+00DF LATIN SMALL LETTER SHARP S c3 9f \u00DF U+6771 N/A (a unified Han ideograph) e6 9d b1 \u6771 U+10400 DESERET CAPITAL LETTER LONG I f0 90 90 80 \uuD801\uDC00 All but the last character in the table, U+10400, can be expressed using a single Java char. U+10400 is a supplementary character and is represented by two Java chars, known as a surrogate pair. The tests in Example 4-5 show the differences between String and Text when processing a string of the four characters from Table 4-7. Example 4-5. Tests showing the differences between the String and Text classes public class StringTextComparisonTest { @Test public void string() throws UnsupportedEncodingException { String s = "\u0041\u00DF\u6771\uD801\uDC00"; assertThat(s.length(), is(5)); assertThat(s.getBytes("UTF-8").length, is(10)); assertThat(s.indexOf("\u0041"), is(0)); assertThat(s.indexOf("\u00DF"), is(1)); assertThat(s.indexOf("\u6771"), is(2)); assertThat(s.indexOf("\uD801\uDC00"), is(3)); assertThat(s.charAt(0), assertThat(s.charAt(1), assertThat(s.charAt(2), assertThat(s.charAt(3), assertThat(s.charAt(4), is('\u0041')); is('\u00DF')); is('\u6771')); is('\uD801')); is('\uDC00')); § This example is based on one from the article Supplementary Characters in the Java Platform. 92 | Chapter 4: Hadoop I/O
Slide 117: } assertThat(s.codePointAt(0), assertThat(s.codePointAt(1), assertThat(s.codePointAt(2), assertThat(s.codePointAt(3), is(0x0041)); is(0x00DF)); is(0x6771)); is(0x10400)); @Test public void text() { Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); assertThat(t.getLength(), is(10)); assertThat(t.find("\u0041"), is(0)); assertThat(t.find("\u00DF"), is(1)); assertThat(t.find("\u6771"), is(3)); assertThat(t.find("\uD801\uDC00"), is(6)); assertThat(t.charAt(0), assertThat(t.charAt(1), assertThat(t.charAt(3), assertThat(t.charAt(6), is(0x0041)); is(0x00DF)); is(0x6771)); is(0x10400)); } } The test confirms that the length of a String is the number of char code units it contains (5, one from each of the first three characters in the string, and a surrogate pair from the last), whereas the length of a Text object is the number of bytes in its UTF-8 encoding (10 = 1+2+3+4). Similarly, the indexOf() method in String returns an index in char code units, and find() for Text is a byte offset. The charAt() method in String returns the char code unit for the given index, which in the case of a surrogate pair will not represent a whole Unicode character. The code PointAt() method, indexed by char code unit, is needed to retrieve a single Unicode character represented as an int. In fact, the charAt() method in Text is more like the codePointAt() method than its namesake in String. The only difference is that it is indexed by byte offset. Iteration. Iterating over the Unicode characters in Text is complicated by the use of byte offsets for indexing, since you can’t just increment the index. The idiom for iteration is a little obscure (see Example 4-6): turn the Text object into a java.nio.ByteBuffer, then repeatedly call the bytesToCodePoint() static method on Text with the buffer. This method extracts the next code point as an int and updates the position in the buffer. The end of the string is detected when bytesToCodePoint() returns –1. Example 4-6. Iterating over the characters in a Text object public class TextIterator { public static void main(String[] args) { Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength()); Serialization | 93
Slide 118: } } int cp; while (buf.hasRemaining() && (cp = Text.bytesToCodePoint(buf)) != -1) { System.out.println(Integer.toHexString(cp)); } Running the program prints the code points for the four characters in the string: % hadoop TextIterator 41 df 6771 10400 Mutability. Another difference with String is that Text is mutable (like all Writable implementations in Hadoop, except NullWritable, which is a singleton). You can reuse a Text instance by calling one of the set() methods on it. For example: Text t = new Text("hadoop"); t.set("pig"); assertThat(t.getLength(), is(3)); assertThat(t.getBytes().length, is(3)); In some situations, the byte array returned by the getBytes() method may be longer than the length returned by getLength(): Text t = new Text("hadoop"); t.set(new Text("pig")); assertThat(t.getLength(), is(3)); assertThat("Byte length not shortened", t.getBytes().length, is(6)); This shows why it is imperative that you always call getLength() when calling getBytes(), so you know how much of the byte array is valid data. Resorting to String. Text doesn’t have as rich an API for manipulating strings as java.lang.String, so in many cases, you need to convert the Text object to a String. This is done in the usual way, using the toString() method: assertThat(new Text("hadoop").toString(), is("hadoop")); BytesWritable BytesWritable is a wrapper for an array of binary data. Its serialized format is an integer field (4 bytes) that specifies the number of bytes to follow, followed by the bytes themselves. For example, the byte array of length two with values 3 and 5 is serialized as a 4-byte integer (00000002) followed by the two bytes from the array (03 and 05): BytesWritable b = new BytesWritable(new byte[] { 3, 5 }); byte[] bytes = serialize(b); assertThat(StringUtils.byteToHexString(bytes), is("000000020305")); 94 | Chapter 4: Hadoop I/O
Slide 119: BytesWritable is mutable, and its value may be changed by calling its set() method. As with Text, the size of the byte array returned from the getBytes() method for Byte sWritable—the capacity—may not reflect the actual size of the data stored in the BytesWritable. You can determine the size of the BytesWritable by calling get Length(). To demonstrate: b.setCapacity(11); assertThat(b.getLength(), is(2)); assertThat(b.getBytes().length, is(11)); NullWritable NullWritable is a special type of Writable, as it has a zero-length serialization. No bytes are written to, or read from, the stream. It is used as a placeholder; for example, in MapReduce, a key or a value can be declared as a NullWritable when you don’t need to use that position—it effectively stores a constant empty value. NullWritable can also be useful as a key in SequenceFile when you want to store a list of values, as opposed to key-value pairs. It is an immutable singleton: the instance can be retrieved by calling NullWritable.get(). ObjectWritable and GenericWritable ObjectWritable is a general-purpose wrapper for the following: Java primitives, String, enum, Writable, null, or arrays of any of these types. It is used in Hadoop RPC to marshal and unmarshal method arguments and return types. ObjectWritable is useful when a field can be of more than one type: for example, if the values in a SequenceFile have multiple types, then you can declare the value type as an ObjectWritable and wrap each type in an ObjectWritable. Being a general-purpose mechanism, it’s fairly wasteful of space since it writes the classname of the wrapped type every time it is serialized. In cases where the number of types is small and known ahead of time, this can be improved by having a static array of types, and using the index into the array as the serialized reference to the type. This is the approach that GenericWritable takes, and you have to subclass it to specify the types to support. Writable collections There are four Writable collection types in the org.apache.hadoop.io package: Array Writable, TwoDArrayWritable, MapWritable, and SortedMapWritable. ArrayWritable and TwoDArrayWritable are Writable implementations for arrays and two-dimensional arrays (array of arrays) of Writable instances. All the elements of an ArrayWritable or a TwoDArrayWritable must be instances of the same class, which is specified at construction, as follows: ArrayWritable writable = new ArrayWritable(Text.class); Serialization | 95
Slide 120: In contexts where the Writable is defined by type, such as in SequenceFile keys or values, or as input to MapReduce in general, you need to subclass ArrayWritable (or TwoDArrayWritable, as appropriate) to set the type statically. For example: public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } } ArrayWritable and TwoDArrayWritable both have get() and set() methods, as well as a toArray() method, which creates a shallow copy of the array (or 2D array). MapWritable and SortedMapWritable are implementations of java.util.Map<Writable, Writable> and java.util.SortedMap<WritableComparable, Writable>, respectively. The type of each key and value field is a part of the serialization format for that field. The type is stored as a single byte that acts as an index into an array of types. The array is populated with the standard types in the org.apache.hadoop.io package, but custom Writable types are accommodated, too, by writing a header that encodes the type array for nonstandard types. As they are implemented, MapWritable and SortedMapWritable use positive byte values for custom types, so a maximum of 127 distinct nonstandard Writable classes can be used in any particular MapWritable or SortedMapWritable instance. Here’s a demonstration of using a MapWritable with different types for keys and values: MapWritable src = new MapWritable(); src.put(new IntWritable(1), new Text("cat")); src.put(new VIntWritable(2), new LongWritable(163)); MapWritable dest = new MapWritable(); WritableUtils.cloneInto(dest, src); assertThat((Text) dest.get(new IntWritable(1)), is(new Text("cat"))); assertThat((LongWritable) dest.get(new VIntWritable(2)), is(new LongWritable(163))); Conspicuous by their absence are Writable collection implementations for sets and lists. A set can be emulated by using a MapWritable (or a SortedMapWritable for a sorted set), with NullWritable values. For lists of a single type of Writable, ArrayWritable is adequate, but to store different types of Writable in a single list, you can use GenericWritable to wrap the elements in an ArrayWritable. Alternatively, you could write a general ListWritable using the ideas from MapWritable. Implementing a Custom Writable Hadoop comes with a useful set of Writable implementations that serve most purposes; however, on occasion, you may need to write your own custom implementation. With a custom Writable, you have full control over the binary representation and the sort order. Because Writables are at the heart of the MapReduce data path, tuning the binary representation can have a significant effect on performance. The stock Writable 96 | Chapter 4: Hadoop I/O
Slide 121: implementations that come with Hadoop are well-tuned, but for more elaborate structures, it is often better to create a new Writable type, rather than compose the stock types. To demonstrate how to create a custom Writable, we shall write an implementation that represents a pair of strings, called TextPair. The basic implementation is shown in Example 4-7. Example 4-7. A Writable implementation that stores a pair of Text objects import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); Serialization | 97
Slide 122: } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } } The first part of the implementation is straightforward: there are two Text instance variables, first and second, and associated constructors, getters, and setters. All Writable implementations must have a default constructor so that the MapReduce framework can instantiate them, then populate their fields by calling readFields(). Writable instances are mutable and often reused, so you should take care to avoid allocating objects in the write() or readFields() methods. TextPair’s write() method serializes each Text object in turn to the output stream, by delegating to the Text objects themselves. Similarly, readFields() deserializes the bytes from the input stream by delegating to each Text object. The DataOutput and DataInput interfaces have a rich set of methods for serializing and deserializing Java primitives, so, in general, you have complete control over the wire format of your Writable object. Just as you would for any value object you write in Java, you should override the hashCode(), equals(), and toString() methods from java.lang.Object. The hash Code() method is used by the HashPartitioner (the default partitioner in MapReduce) to choose a reduce partition, so you should make sure that you write a good hash function that mixes well to ensure reduce partitions are of a similar size. 98 | Chapter 4: Hadoop I/O
Slide 123: If you ever plan to use your custom Writable with TextOutputFormat, then you must implement its toString() method. TextOutputFormat calls toString() on keys and values for their output representation. For Text Pair, we write the underlying Text objects as strings separated by a tab character. TextPair is an implementation of WritableComparable, so it provides an implementation of the compareTo() method that imposes the ordering you would expect: it sorts by the first string followed by the second. Notice that TextPair differs from TextArrayWrita ble from the previous section (apart from the number of Text objects it can store), since TextArrayWritable is only a Writable, not a WritableComparable. Implementing a RawComparator for speed The code for TextPair in Example 4-7 will work as it stands; however, there is a further optimization we can make. As explained in “WritableComparable and comparators” on page 88, when TextPair is being used as a key in MapReduce, it will have to be deserialized into an object for the compareTo() method to be invoked. What if it were possible to compare two TextPair objects just by looking at their serialized representations? It turns out that we can do this, since TextPair is the concatenation of two Text objects, and the binary representation of a Text object is a variable-length integer containing the number of bytes in the UTF-8 representation of the string, followed by the UTF-8 bytes themselves. The trick is to read the initial length, so we know how long the first Text object’s byte representation is; then we can delegate to Text’s RawComparator, and invoke it with the appropriate offsets for the first or second string. Example 4-8 gives the details (note that this code is nested in the TextPair class). Example 4-8. A RawComparator for comparing TextPair byte representations public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } Serialization | 99
Slide 124: } } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } static { WritableComparator.define(TextPair.class, new Comparator()); } We actually subclass WritableComparator rather than implement RawComparator directly, since it provides some convenience methods and default implementations. The subtle part of this code is calculating firstL1 and firstL2, the lengths of the first Text field in each byte stream. Each is made up of the length of the variable-length integer (returned by decodeVIntSize() on WritableUtils) and the value it is encoding (returned by readVInt()). The static block registers the raw comparator so that whenever MapReduce sees the TextPair class, it knows to use the raw comparator as its default comparator. Custom comparators As we can see with TextPair, writing raw comparators takes some care, since you have to deal with details at the byte level. It is worth looking at some of the implementations of Writable in the org.apache.hadoop.io package for further ideas, if you need to write your own. The utility methods on WritableUtils are very handy, too. Custom comparators should also be written to be RawComparators, if possible. These are comparators that implement a different sort order to the natural sort order defined by the default comparator. Example 4-9 shows a comparator for TextPair, called First Comparator, that considers only the first string of the pair. Note that we override the compare() method that takes objects so both compare() methods have the same semantics. We will make use of this comparator in Chapter 8, when we look at joins and secondary sorting in MapReduce (see “Joins” on page 247). Example 4-9. A custom RawComparator for comparing the first field of TextPair byte representations public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 100 | Chapter 4: Hadoop I/O
Slide 125: } try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } Serialization Frameworks Although most MapReduce programs use Writable key and value types, this isn’t mandated by the MapReduce API. In fact, any types can be used; the only requirement is that there be a mechanism that translates to and from a binary representation of each type. To support this, Hadoop has an API for pluggable serialization frameworks. A serialization framework is represented by an implementation of Serialization (in the org.apache.hadoop.io.serializer package). WritableSerialization, for example, is the implementation of Serialization for Writable types. A Serialization defines a mapping from types to Serializer instances (for turning an object into a byte stream) and Deserializer instances (for turning a byte stream into an object). Set the io.serializations property to a comma-separated list of classnames to register Serialization implementations. Its default value is org.apache.hadoop.io.serial izer.WritableSerialization, which means that only Writable objects can be serialized or deserialized out of the box. Hadoop includes a class called JavaSerialization that uses Java Object Serialization. Although it makes it convenient to be able to use standard Java types in MapReduce programs, like Integer or String, Java Object Serialization is not as efficient as Writables, so it’s not worth making this trade-off (see the sidebar on the next page). Serialization | 101
Slide 126: Why Not Use Java Object Serialization? Java comes with its own serialization mechanism, called Java Object Serialization (often referred to simply as “Java Serialization”), that is tightly integrated with the language, so it’s natural to ask why this wasn’t used in Hadoop. Here’s what Doug Cutting said in response to that question: Why didn’t I use Serialization when we first started Hadoop? Because it looked big and hairy and I thought we needed something lean and mean, where we had precise control over exactly how objects are written and read, since that is central to Hadoop. With Serialization you can get some control, but you have to fight for it. The logic for not using RMI was similar. Effective, high-performance inter-process communications are critical to Hadoop. I felt like we’d need to precisely control how things like connections, timeouts and buffers are handled, and RMI gives you little control over those. The problem is that Java Serialization doesn’t meet the criteria for a serialization format listed earlier: compact, fast, extensible, and interoperable. Java Serialization is not compact: it writes the classname of each object being written to the stream—this is true of classes that implement java.io.Serializable or java.io.Externalizable. Subsequent instances of the same class write a reference handle to the first occurrence, which occupies only 5 bytes. However, reference handles don’t work well with random access, since the referent class may occur at any point in the preceding stream—that is, there is state stored in the stream. Even worse, reference handles play havoc with sorting records in a serialized stream, since the first record of a particular class is distinguished and must be treated as a special case. All these problems are avoided by not writing the classname to the stream at all, which is the approach that Writable takes. This makes the assumption that the client knows the expected type. The result is that the format is considerably more compact than Java Serialization, and random access and sorting work as expected since each record is independent of the others (so there is no stream state). Java Serialization is a general-purpose mechanism for serializing graphs of objects, so it necessarily has some overhead for serialization and deserialization operations. What’s more, the deserialization procedure creates a new instance for each object deserialized from the stream. Writable objects, on the other hand, can be (and often are) reused. For example, for a MapReduce job, which at its core serializes and deserializes billions of records of just a handful of different types, the savings gained by not having to allocate new objects are significant. In terms of extensibility, Java Serialization has some support for evolving a type, but it is brittle and hard to use effectively (Writables have no support: the programmer has to manage them himself). In principle, other languages could interpret the Java Serialization stream protocol (defined by the Java Object Serialization Specification), but in practice there are no widely 102 | Chapter 4: Hadoop I/O
Slide 127: used implementations in other languages, so it is a Java-only solution. The situation is the same for Writables. Serialization IDL There are a number of other serialization frameworks that approach the problem in a different way: rather than defining types through code, you define them in a languageneutral, declarative fashion, using an interface description language (IDL). The system can then generate types for different languages, which is good for interoperability. They also typically define versioning schemes that make type evolution straightforward. Hadoop’s own Record I/O (found in the org.apache.hadoop.record package) has an IDL that is compiled into Writable objects, which makes it convenient for generating types that are compatible with MapReduce. For whatever reason, however, Record I/O was not widely used, and has been deprecated in favor of Avro. Apache Thrift and Google Protocol Buffers are both popular serialization frameworks, and they are commonly used as a format for persistent binary data. There is limited support for these as MapReduce formats;‖ however, Thrift is used in parts of Hadoop to provide cross-language APIs, such as the “thriftfs” contrib module, where it is used to expose an API to Hadoop filesystems (see “Thrift” on page 49). In the next section, we look at Avro, an IDL-based serialization framework designed to work well with large-scale data processing in Hadoop. Avro Apache Avro# is a language-neutral data serialization system. The project was created by Doug Cutting (the creator of Hadoop) to address the major downside of Hadoop Writables: lack of language portability. Having a data format that can be processed by many languages (currently C, C++, Java, Python, and Ruby) makes it easier to share datasets with a wider audience than one tied to a single language. It is also more futureproof, allowing data to potentially outlive the language used to read and write it. But why a new data serialization system? Avro has a set of features that, taken together, differentiate it from other systems like Apache Thrift or Google’s Protocol Buffers.* Like these systems and others, Avro data is described using a language-independent schema. However, unlike some other systems, code generation is optional in Avro, ‖ You can find the latest status for a Thrift Serialization at https://issues.apache.org/jira/browse/HADOOP -3787, and a Protocol Buffers Serialization at https://issues.apache.org/jira/browse/HADOOP-3788. Twitter’s Elephant Bird project (http://github.com/kevinweil/elephant-bird) includes tools for working with Protocol Buffers in Hadoop. #Named after the British aircraft manufacturer from the 20th century. * Avro also performs favorably compared to other serialization libraries, as the benchmarks at http://code.google .com/p/thrift-protobuf-compare/ demonstrate. Serialization | 103
Slide 128: which means you can read and write data that conforms to a given schema even if your code has not seen that particular schema before. To achieve this, Avro assumes that the schema is always present—at both read and write time—which makes for a very compact encoding, since encoded values do not need to be tagged with a field identifier. Avro schemas are usually written in JSON, and data is usually encoded using a binary format, but there are other options, too. There is a higher-level language called Avro IDL, for writing schemas in a C-like language that is more familiar to developers. There is also a JSON-based data encoder, which, being human-readable, is useful for prototyping and debugging Avro data. The Avro specification precisely defines the binary format that all implementations must support. It also specifies many of the other features of Avro that implementations should support. One area that the specification does not rule on, however, is APIs: implementations have complete latitude in the API they expose for working with Avro data, since each one is necessarily language-specific. The fact that there is only one binary format is significant, since it means the barrier for implementing a new language binding is lower, and avoids the problem of a combinatorial explosion of languages and formats, which would harm interoperability. Avro has rich schema resolution capabilities. Within certain carefully defined constraints, the schema used to read data need not be identical to the schema that was used to write the data. This is the mechanism by which Avro supports schema evolution. For example, a new, optional field may be added to a record by declaring it in the schema used to read the old data. New and old clients alike will be able to read the old data, while new clients can write new data that uses the new field. Conversely, if an old client sees newly encoded data, it will gracefully ignore the new field and carry on processing as it would have done with old data. Avro specifies an object container format for sequences of objects—similar to Hadoop’s sequence file. An Avro data file has a metadata section where the schema is stored, which makes the file self-describing. Avro data files support compression and are splittable, which is crucial for a MapReduce data input format. Furthermore, since Avro was designed with MapReduce in mind, in the future it will be possible to use Avro to bring first-class MapReduce APIs (that is, ones that are richer than Streaming, like the Java API, or C++ Pipes) to languages that speak Avro. Avro can be used for RPC, too, although this isn’t covered here. The Hadoop project has plans to migrate to Avro RPC, which will have several benefits, including supporting rolling upgrades, and the possibility of multilanguage clients, such as an HDFS client implemented entirely in C. Avro data types and schemas Avro defines a small number of data types, which can be used to build applicationspecific data structures by writing schemas. For interoperability, implementations must support all Avro types. 104 | Chapter 4: Hadoop I/O
Slide 129: Avro’s primitive types are listed in Table 4-8. Each primitive type may also be specified using a more verbose form, using the type attribute, such as: { "type": "null" } Table 4-8. Avro primitive types Type null boolean int long float double bytes string Description The absence of a value A binary value 32-bit signed integer 64-bit signed integer Single precision (32-bit) IEEE 754 floating-point number Double precision (64-bit) IEEE 754 floating-point number Sequence of 8-bit unsigned bytes Sequence of Unicode characters Schema "null" "boolean" "int" "long" "float" "double" "bytes" "string" Avro also defines the complex types listed in Table 4-9, along with a representative example of a schema of each type. Table 4-9. Avro complex types Type array Description An ordered collection of objects. All objects in a particular array must have the same schema. An unordered collection of key-value pairs. Keys must be strings, values may be any type, although within a particular map all values must have the same schema. A collection of named fields of any type. Schema example { } "type": "array", "items": "long" map { } { "type": "map", "values": "string" record } "type": "record", "name": "WeatherRecord", "doc": "A weather reading.", "fields": [ {"name": "year", "type": "int"}, {"name": "temperature", "type": "int"}, {"name": "stationId", "type": "string"} ] enum A set of named values. { } "type": "enum", "name": "Cutlery", "doc": "An eating utensil.", "symbols": ["KNIFE", "FORK", "SPOON"] fixed A fixed number of 8-bit unsigned bytes. { "type": "fixed", "name": "Md5Hash", Serialization | 105
Slide 130: Type Description A union of schemas. A union is represented by a JSON array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union. Schema example } "size": 16 [ union ] "null", "string", {"type": "map", "values": "string"} Each Avro language API has a representation for each Avro type that is specific to the language. For example, Avro’s double type is represented in C, C++, and Java by a double, in Python by a float, and in Ruby by a Float. What’s more, there may be more than one representation, or mapping, for a language. All languages support a dynamic mapping, which can be used even when the schema is not known ahead of run time. Java calls this the generic mapping. In addition, the Java and C++ implementations can generate code to represent the data for an Avro schema. Code generation, which is called the specific mapping in Java, is an optimization that is useful when you have a copy of the schema before you read or write data. Generated classes also provide a more domain-oriented API for user code than generic ones. Java has a third mapping, the reflect mapping, which maps Avro types onto preexisting Java types, using reflection. It is slower than the generic and specific mappings, and is not generally recommended for new applications. Java’s type mappings are shown in Table 4-10. As the table shows, the specific mapping is the same as the generic one unless otherwise noted (and the reflect one is the same as the specific one unless noted). The specific mapping only differs from the generic one for record, enum, and fixed, all of which have generated classes (the name of which is controlled by the name and optional namespace attribute). Why don’t the Java generic and specific mappings use Java String to represent an Avro string? The answer is efficiency: the Avro Utf8 type is mutable, so it may be reused for reading or writing a series of values. Also, Java String decodes UTF-8 at object construction time, while Avro Utf8 does it lazily, which can increase performance in some cases. Note that the Java reflect mapping does use Java’s String class, since it is designed for Java compatibility, not performance. Table 4-10. Avro Java type mappings Avro type null boolean int Generic Java mapping null type boolean int Specific Java mapping Reflect Java mapping short or int 106 | Chapter 4: Hadoop I/O
Slide 131: Avro type long float double bytes string array map record Generic Java mapping long float double java.nio.ByteBuffer org.apache.avro. util.Utf8 org.apache.avro. generic.GenericArray java.util.Map org.apache.avro. generic.Generic Record java.lang.String org.apache.avro. generic.GenericFixed java.lang.Object Specific Java mapping Reflect Java mapping Array of byte java.lang.String Array or java.util.Collection Generated class implementing org.apache.avro. specific.Specific Record. Arbitrary user class with a zeroargument constructor. All inherited nontransient instance fields are used. Arbitrary Java enum org.apache.avro. generic.GenericFixed enum fixed Generated Java enum Generated class implementing org.apache.avro. specific.SpecificFixed. union In-memory serialization and deserialization Avro provides APIs for serialization and deserialization, which are useful when you want to integrate Avro with an existing system, such as a messaging system where the framing format is already defined. In other cases, consider using Avro’s data file format. Let’s write a Java program to read and write Avro data to and from streams. We’ll start with a simple Avro schema for representing a pair of strings as a record: { "type": "record", "name": "Pair", "doc": "A pair of strings.", "fields": [ {"name": "left", "type": "string"}, {"name": "right", "type": "string"} ] } If this schema is saved in a file on the classpath called Pair.avsc (.avsc is the conventional extension for an Avro schema), then we can load it using the following statement: Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc")); We can create an instance of an Avro record using the generic API as follows: Serialization | 107
Slide 132: GenericRecord datum = new GenericData.Record(schema); datum.put("left", new Utf8("L")); datum.put("right", new Utf8("R")); Notice that we construct Avro Utf8 instances for the record’s string fields. Next, we serialize the record to an output stream: ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); Encoder encoder = new BinaryEncoder(out); writer.write(datum, encoder); encoder.flush(); out.close(); There are two important objects here: the DatumWriter and the Encoder. A DatumWriter translates data objects into the types understood by an Encoder, which the latter writes to the output stream. Here we are using a GenericDatumWriter, which passes the fields of GenericRecord to the Encoder, in this case the BinaryEncoder. In this example only one object is written to the stream, but we could call write() with more objects before closing the stream if we wanted to. The GenericDatumWriter needs to be passed the schema since it follows the schema to determine which values from the data objects to write out. After we have called the writer’s write() method, we flush the encoder, then close the output stream. We can reverse the process and read the object back from the byte buffer: DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); Decoder decoder = DecoderFactory.defaultFactory() .createBinaryDecoder(out.toByteArray(), null); GenericRecord result = reader.read(null, decoder); assertThat(result.get("left").toString(), is("L")); assertThat(result.get("right").toString(), is("R")); We pass null to the calls to createBinaryDecoder() and read() since we are not reusing objects here (the decoder or the record, respectively). Let’s look briefly at the equivalent code using the specific API. We can generate the Pair class from the schema file, by using the Avro tools JAR file:† % java -jar $AVRO_HOME/avro-tools-*.jar compile schema \ > avro/src/main/resources/Pair.avsc avro/src/main/java Then instead of a GenericRecord we construct a Pair instance, which we write to the stream using a SpecificDatumWriter, and read back using a SpecificDatumReader: Pair datum = new Pair(); datum.left = new Utf8("L"); datum.right = new Utf8("R"); ByteArrayOutputStream out = new ByteArrayOutputStream(); † Avro can be downloaded in both source and binary forms from http://avro.apache.org/releases.html. 108 | Chapter 4: Hadoop I/O
Slide 133: DatumWriter<Pair> writer = new SpecificDatumWriter<Pair>(Pair.class); Encoder encoder = new BinaryEncoder(out); writer.write(datum, encoder); encoder.flush(); out.close(); DatumReader<Pair> reader = new SpecificDatumReader<Pair>(Pair.class); Decoder decoder = DecoderFactory.defaultFactory() .createBinaryDecoder(out.toByteArray(), null); Pair result = reader.read(null, decoder); assertThat(result.left.toString(), is("L")); assertThat(result.right.toString(), is("R")); Avro data files Avro’s object container file format is for storing sequences of Avro objects. It is very similar in design to Hadoop’s sequence files, which are described in “SequenceFile” on page 116. The main difference is that Avro data files are designed to be portable across languages, so, for example, you can write a file in Python and read it in C (we will do exactly this in the next section). A data file has a header containing metadata, including the Avro schema and a sync marker, followed by a series of (optionally compressed) blocks containing the serialized Avro objects. Blocks are separated by a sync marker that is unique to the file (the marker for a particular file is found in the header) and that permits rapid resynchronization with a block boundary after seeking to an arbitrary point in the file, such as an HDFS block boundary. Thus, Avro data files are splittable, which makes them amenable to efficient MapReduce processing. Writing Avro objects to a data file is similar to writing to a stream. We use a DatumWriter, as before, but instead of using an Encoder, we create a DataFileWriter instance with the DatumWriter. Then we can create a new data file (which, by convention, has a .avro extension) and append objects to it: File file = new File("data.avro"); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer); dataFileWriter.create(schema, file); dataFileWriter.append(datum); dataFileWriter.close(); The objects that we write to the data file must conform to the file’s schema, otherwise an exception will be thrown when we call append(). This example demonstrates writing to a local file (java.io.File in the previous snippet), but we can write to any java.io.OutputStream by using the overloaded create() method on DataFileWriter. To write a file to HDFS, for example, get an OutputStream by calling create() on FileSystem (see “Writing Data” on page 55). Serialization | 109
Slide 134: Reading back objects from a data file is similar to the earlier case of reading objects from an in-memory stream, with one important difference: we don’t have to specify a schema since it is read from the file metadata. Indeed, we can get the schema from the DataFileReader instance, using getSchema(), and verify that it is the same as the one we used to write the original object with: DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, reader); assertThat("Schema is the same", schema, is(dataFileReader.getSchema())); DataFileReader is a regular Java iterator, so we can iterate through its data objects by calling its hasNext() and next() methods. The following snippet checks that there is only one record, and that it has the expected field values: assertThat(dataFileReader.hasNext(), is(true)); GenericRecord result = dataFileReader.next(); assertThat(result.get("left").toString(), is("L")); assertThat(result.get("right").toString(), is("R")); assertThat(dataFileReader.hasNext(), is(false)); Rather than using the usual next() method, however, it is preferable to use the overloaded form that takes an instance of the object to be returned (in this case, GenericRecord), since it will reuse the object and save allocation and garbage collection costs for files containing many objects. The following is idiomatic: GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(record); // process record } If object reuse is not important, you can use this shorter form: for (GenericRecord record : dataFileReader) { // process record } For the general case of reading a file on a Hadoop file system, use Avro’s FsInput to specify the input file using a Hadoop Path object. DataFileReader actually offers random access to Avro data file (via its seek() and sync() methods); however, in many cases, sequential streaming access is sufficient, for which DataFileStream should be used. DataFileStream can read from any Java InputStream. Interoperability To demonstrate Avro’s language interoperability, let’s write a data file using one language (Python) and read it back with another (C). Python API. The program in Example 4-10 reads comma-separated strings from standard input and writes them as Pair records to an Avro data file. Like the Java code for writing a data file, we create a DatumWriter and a DataFileWriter object. Notice that we have 110 | Chapter 4: Hadoop I/O
Slide 135: embedded the Avro schema in the code, although we could equally well have read it from a file. Python represents Avro records as dictionaries; each line that is read from standard in is turned into a dict object and appended to the DataFileWriter. Example 4-10. A Python program for writing Avro record pairs to a data file import os import string import sys from avro import schema from avro import io from avro import datafile if __name__ == '__main__': if len(sys.argv) != 2: sys.exit('Usage: %s <data_file>' % sys.argv[0]) avro_file = sys.argv[1] writer = open(avro_file, 'wb') datum_writer = io.DatumWriter() schema_object = schema.parse("""\ { "type": "record", "name": "Pair", "doc": "A pair of strings.", "fields": [ {"name": "left", "type": "string"}, {"name": "right", "type": "string"} ] }""") dfw = datafile.DataFileWriter(writer, datum_writer, schema_object) for line in sys.stdin.readlines(): (left, right) = string.split(line.strip(), ',') dfw.append({'left':left, 'right':right}); dfw.close() Before we can run the program, we need to install Avro for Python: % easy_install avro To run the program, we specify the name of the file to write output to (pairs.avro) and send input pairs over standard in, marking the end of file by typing Control-D: % python avro/src/main/py/write_pairs.py pairs.avro a,1 c,2 b,3 b,2 ^D C API. Next we’ll turn to the C API and write a program to display the contents of pairs.avro; see Example 4-11.‡ Serialization | 111
Slide 136: Example 4-11. A C program for reading Avro record pairs from a data file #include <avro.h> #include <stdio.h> #include <stdlib.h> int main(int argc, char *argv[]) { if (argc != 2) { fprintf(stderr, "Usage: dump_pairs <data_file>\n"); exit(EXIT_FAILURE); } const char *avrofile = argv[1]; avro_schema_error_t error; avro_file_reader_t filereader; avro_datum_t pair; avro_datum_t left; avro_datum_t right; int rval; char *p; avro_file_reader(avrofile, &filereader); while (1) { rval = avro_file_reader_read(filereader, NULL, &pair); if (rval) break; if (avro_record_get(pair, "left", &left) == 0) { avro_string_get(left, &p); fprintf(stdout, "%s,", p); } if (avro_record_get(pair, "right", &right) == 0) { avro_string_get(right, &p); fprintf(stdout, "%s\n", p); } } avro_file_reader_close(filereader); return 0; } The core of the program does three things: 1. opens a file reader of type avro_file_reader_t by calling Avro’s avro_file_reader function,§ 2. reads Avro data from the file reader with the avro_file_reader_read function in a while loop until there are no pairs left (as determined by the return value rval), and 3. closes the file reader with avro_file_reader_close. The avro_file_reader_read function accepts a schema as its second argument to support the case where the schema for reading is different to the one used when the file ‡ For the general case, the Avro tools JAR file has a tojson command that dumps the contents of a Avro data file as JSON. § Avro functions and types have a avro_ prefix and are defined in the avro.h header file. 112 | Chapter 4: Hadoop I/O
Slide 137: was written (this is explained in the next section), but we simply pass in NULL, which tells Avro to use the data file’s schema. The third argument is a pointer to a avro_datum_t object, which is populated with the contents of the next record read from the file. We unpack the pair structure into its fields by calling avro_record_get, and then we extract the value of these fields as strings using avro_string_get, which we print to the console. Running the program using the output of the Python program prints the original input: % ./dump_pairs pairs.avro a,1 c,2 b,3 b,2 We have successfully exchanged complex data between two Avro implementations. Schema resolution We can choose to use a different schema for reading the data back (the reader’s schema) to the one we used to write it (the writer’s schema). This is a powerful tool, since it enables schema evolution. To illustrate, consider a new schema for string pairs, with an added description field: { "type": "record", "name": "Pair", "doc": "A pair of strings with an added field.", "fields": [ {"name": "left", "type": "string"}, {"name": "right", "type": "string"}, {"name": "description", "type": "string", "default": ""} ] } We can use this schema to read the data we serialized earlier, since, crucially, we have given the description field a default value (the empty string‖), which Avro will use when there is no field defined in the records it is reading. Had we omitted the default attribute, we would get an error when trying to read the old data. To make the default value null, rather than the empty string, we would instead define the description field using a union with the null Avro type: {"name": "description", "type": ["null", "string"], "default": "null"} ‖ Default values for fields are encoded using JSON. See the Avro specification for a description of this encoding for each data type. Serialization | 113
Slide 138: When the reader’s schema is different from the writer’s, we use the constructor for GenericDatumReader that takes two schema objects, the writer’s and the reader’s, in that order: DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, newSchema); Decoder decoder = DecoderFactory.defaultFactory() .createBinaryDecoder(out.toByteArray(), null); GenericRecord result = reader.read(null, decoder); assertThat(result.get("left").toString(), is("L")); assertThat(result.get("right").toString(), is("R")); assertThat(result.get("description").toString(), is("")); For data files, which have the writer’s schema stored in the metadata, we only need to specify the readers’s schema explicitly, which we can do by passing null for the writer’s schema: DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(null, newSchema); Another common use of a different reader’s schema is to drop fields in a record, an operation called projection. This is useful when you have records with a large number of fields and you only want to read some of them. For example, this schema can be used to get only the right field of a Pair: { "type": "record", "name": "Pair", "doc": "The right field of a pair of strings.", "fields": [ {"name": "right", "type": "string"} ] } The rules for schema resolution have a direct bearing on how schemas may evolve from one version to the next, and are spelled out in the Avro specification for all Avro types. A summary of the rules for record evolution from the point of view of readers and writers (or servers and clients) is presented in Table 4-11. Table 4-11. Schema resolution of records New schema Added field Writer Old New Removed field Old New Reader New Old New Old Action The reader uses the default value of the new field, since it is not written by the writer. The reader does not know about the new field written by the writer, so it is ignored. (Projection). The reader ignores the removed field. (Projection). The removed field is not written by the writer. If the old schema had a default defined for the field, then the reader uses this, otherwise it gets an error. In this case, it is best to update the reader’s schema at the same time as, or before, the writer’s. 114 | Chapter 4: Hadoop I/O
Slide 139: Sort order Avro defines a sort order for objects. For most Avro types, the order is the natural one you would expect—for example, numeric types are ordered by ascending numeric value. Others are a little more subtle—enums are compared by the order in which the symbol is defined and not by the value of the symbol string, for instance. All types except record have preordained rules for their sort order as described in the Avro specification; they cannot be overridden by the user. For records, however, you can control the sort order by specifying the order attribute for a field. It takes one of three values: ascending (the default), descending (to reverse the order), or ignore (so the field is skipped for comparison purposes). For example, the following schema (SortedPair.avsc) defines an ordering of Pair records by the right field in descending order. The left field is ignored for the purposes of ordering, but it is still present in the projection: { "type": "record", "name": "Pair", "doc": "A pair of strings, sorted by right field descending.", "fields": [ {"name": "left", "type": "string", "order": "ignore"}, {"name": "right", "type": "string", "order": "descending"} ] } The record’s fields are compared pairwise in the document order of the reader’s schema. Thus, by specifying an appropriate reader’s schema, you can impose an arbitrary ordering on data records. This schema (SwitchedPair.avsc) defines a sort order by the right field, then the left: { "type": "record", "name": "Pair", "doc": "A pair of strings, sorted by right then left.", "fields": [ {"name": "right", "type": "string"}, {"name": "left", "type": "string"} ] } Avro implements efficient binary comparisons. That is to say, Avro does not have to deserialize a binary data into objects to perform the comparison, since it can instead work directly on the byte streams.# In the case of the original Pair schema (with no order attributes), for example, Avro implements the binary comparison as follows. #A useful consequence of this property is that you can compute an Avro datum’s hash code from either the object or the binary representation (the latter by using the static hashCode() method on BinaryData) and get the same result in both cases. Serialization | 115
Slide 140: The first field, left, is a UTF-8-encoded string, for which Avro can compare the bytes lexicographically. If they differ, then the order is determined, and Avro can stop the comparison there. Otherwise, if the two byte sequences are the same, it compares the second two (right) fields, again lexicographically at the byte level since the field is another UTF-8 string. Notice that this description of a comparison function has exactly the same logic as the binary comparator we wrote for Writables in “Implementing a RawComparator for speed” on page 99. The great thing is that Avro provides the comparator for us, so we don’t have to write and maintain this code. It’s also easy to change the sort order just by changing the reader’s schema. For the SortedPair.avsc or SwitchedPair.avsc schemas, the comparison function Avro uses is essentially the same as the one just described: the difference is in which fields are considered, the order in which they are considered, and whether the order is ascending or descending. Avro MapReduce Avro provides a number of classes for making it easy to run MapReduce programs on Avro data. For example, AvroMapper and AvroReducer in the org.apache.avro.mapred package are specializations of Hadoop’s (old style) Mapper and Reducer classes. They eliminate the key-value distinction for inputs and outputs, since Avro data files are just a sequence of values. However, intermediate data is still divided into key-value pairs for the shuffle. Avro’s MapReduce integration was being added as this edition went to press, but you can find example code at the website accompanying this book. For languages other than Java, Avro provides a connector framework (in the org.apache.avro.mapred.tether package). At the time of writing, there are no bindings for other languages, but it is expected these will be added in future releases. File-Based Data Structures For some applications, you need a specialized data structure to hold your data. For doing MapReduce-based processing, putting each blob of binary data into its own file doesn’t scale, so Hadoop developed a number of higher-level containers for these situations. SequenceFile Imagine a logfile, where each log record is a new line of text. If you want to log binary types, plain text isn’t a suitable format. Hadoop’s SequenceFile class fits the bill in this situation, providing a persistent data structure for binary key-value pairs. To use it as a logfile format, you would choose a key, such as timestamp represented by a LongWrit able, and the value is a Writable that represents the quantity being logged. 116 | Chapter 4: Hadoop I/O
Slide 141: SequenceFiles also work well as containers for smaller files. HDFS and MapReduce are optimized for large files, so packing files into a SequenceFile makes storing and processing the smaller files more efficient. (“Processing a whole file as a record” on page 206 contains a program to pack files into a SequenceFile.*) Writing a SequenceFile To create a SequenceFile, use one of its createWriter() static methods, which returns a SequenceFile.Writer instance. There are several overloaded versions, but they all require you to specify a stream to write to (either a FSDataOutputStream or a FileSys tem and Path pairing), a Configuration object, and the key and value types. Optional arguments include the compression type and codec, a Progressable callback to be informed of write progress, and a Metadata instance to be stored in the SequenceFile header. The keys and values stored in a SequenceFile do not necessarily need to be Writable. Any types that can be serialized and deserialized by a Serialization may be used. Once you have a SequenceFile.Writer, you then write key-value pairs, using the append() method. Then when you’ve finished, you call the close() method (Sequence File.Writer implements java.io.Closeable). Example 4-12 shows a short program to write some key-value pairs to a Sequence File, using the API just described. Example 4-12. Writing a SequenceFile public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, * In a similar vein, the blog post “A Million Little Files” by Stuart Sierra includes code for converting a tar file into a SequenceFile, http://stuartsierra.com/2008/04/24/a-million-little-files. File-Based Data Structures | 117
Slide 142: key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } The keys in the sequence file are integers counting down from 100 to 1, represented as IntWritable objects. The values are Text objects. Before each record is appended to the SequenceFile.Writer, we call the getLength() method to discover the current position in the file. (We will use this information about record boundaries in the next section when we read the file nonsequentially.) We write the position out to the console, along with the key and value pairs. The result of running it is shown here: % hadoop SequenceFileWriteDemo numbers.seq [128] 100 One, two, buckle my shoe [173] 99 Three, four, shut the door [220] 98 Five, six, pick up sticks [264] 97 Seven, eight, lay them straight [314] 96 Nine, ten, a big fat hen [359] 95 One, two, buckle my shoe [404] 94 Three, four, shut the door [451] 93 Five, six, pick up sticks [495] 92 Seven, eight, lay them straight [545] 91 Nine, ten, a big fat hen ... [1976] 60 One, two, buckle my shoe [2021] 59 Three, four, shut the door [2088] 58 Five, six, pick up sticks [2132] 57 Seven, eight, lay them straight [2182] 56 Nine, ten, a big fat hen ... [4557] 5 One, two, buckle my shoe [4602] 4 Three, four, shut the door [4649] 3 Five, six, pick up sticks [4693] 2 Seven, eight, lay them straight [4743] 1 Nine, ten, a big fat hen Reading a SequenceFile Reading sequence files from beginning to end is a matter of creating an instance of SequenceFile.Reader and iterating over records by repeatedly invoking one of the next() methods. Which one you use depends on the serialization framework you are using. If you are using Writable types, you can use the next() method that takes a key 118 | Chapter 4: Hadoop I/O
Slide 143: and a value argument, and reads the next key and value in the stream into these variables: public boolean next(Writable key, Writable val) The return value is true if a key-value pair was read and false if the end of the file has been reached. For other, nonWritable serialization frameworks (such as Apache Thrift), you should use these two methods: public Object next(Object key) throws IOException public Object getCurrentValue(Object val) throws IOException In this case, you need to make sure that the serialization you want to use has been set in the io.serializations property; see “Serialization Frameworks” on page 101. If the next() method returns a non-null object, a key-value pair was read from the stream, and the value can be retrieved using the getCurrentValue() method. Otherwise, if next() returns null, the end of the file has been reached. The program in Example 4-13 demonstrates how to read a sequence file that has Writable keys and values. Note how the types are discovered from the Sequence File.Reader via calls to getKeyClass() and getValueClass(), then ReflectionUtils is used to create an instance for the key and an instance for the value. By using this technique, the program can be used with any sequence file that has Writable keys and values. Example 4-13. Reading a SequenceFile public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); // beginning of next record } } finally { IOUtils.closeStream(reader); } File-Based Data Structures | 119
Slide 144: } } Another feature of the program is that it displays the position of the sync points in the sequence file. A sync point is a point in the stream that can be used to resynchronize with a record boundary if the reader is “lost”—for example, after seeking to an arbitrary position in the stream. Sync points are recorded by SequenceFile.Writer, which inserts a special entry to mark the sync point every few records as a sequence file is being written. Such entries are small enough to incur only a modest storage overhead—less than 1%. Sync points always align with record boundaries. Running the program in Example 4-13 shows the sync points in the sequence file as asterisks. The first one occurs at position 2021 (the second one occurs at position 4075, but is not shown in the output): % hadoop SequenceFileReadDemo numbers.seq [128] 100 One, two, buckle my shoe [173] 99 Three, four, shut the door [220] 98 Five, six, pick up sticks [264] 97 Seven, eight, lay them straight [314] 96 Nine, ten, a big fat hen [359] 95 One, two, buckle my shoe [404] 94 Three, four, shut the door [451] 93 Five, six, pick up sticks [495] 92 Seven, eight, lay them straight [545] 91 Nine, ten, a big fat hen [590] 90 One, two, buckle my shoe ... [1976] 60 One, two, buckle my shoe [2021*] 59 Three, four, shut the door [2088] 58 Five, six, pick up sticks [2132] 57 Seven, eight, lay them straight [2182] 56 Nine, ten, a big fat hen ... [4557] 5 One, two, buckle my shoe [4602] 4 Three, four, shut the door [4649] 3 Five, six, pick up sticks [4693] 2 Seven, eight, lay them straight [4743] 1 Nine, ten, a big fat hen There are two ways to seek to a given position in a sequence file. The first is the seek() method, which positions the reader at the given point in the file. For example, seeking to a record boundary works as expected: reader.seek(359); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(95)); But if the position in the file is not at a record boundary, the reader fails when the next() method is called: reader.seek(360); reader.next(key, value); // fails with IOException 120 | Chapter 4: Hadoop I/O
Slide 145: The second way to find a record boundary makes use of sync points. The sync(long position) method on SequenceFile.Reader positions the reader at the next sync point after position. (If there are no sync points in the file after this position, then the reader will be positioned at the end of the file.) Thus, we can call sync() with any position in the stream—a nonrecord boundary, for example—and the reader will reestablish itself at the next sync point so reading can continue: reader.sync(360); assertThat(reader.getPosition(), is(2021L)); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(59)); SequenceFile.Writer has a method called sync() for inserting a sync point at the current position in the stream. This is not to be confused with the identically named but otherwise unrelated sync() method defined by the Syncable interface for synchronizing buffers to the underlying device. Sync points come into their own when using sequence files as input to MapReduce, since they permit the file to be split, so different portions of it can be processed independently by separate map tasks. See “SequenceFileInputFormat” on page 213. Displaying a SequenceFile with the command-line interface The hadoop fs command has a -text option to display sequence files in textual form. It looks at a file’s magic number so that it can attempt to detect the type of the file and appropriately convert it to text. It can recognize gzipped files and sequence files; otherwise, it assumes the input is plain text. For sequence files, this command is really useful only if the keys and values have a meaningful string representation (as defined by the toString() method). Also, if you have your own key or value classes, then you will need to make sure they are on Hadoop’s classpath. Running it on the sequence file we created in the previous section gives the following output: % hadoop fs -text numbers.seq | head 100 One, two, buckle my shoe 99 Three, four, shut the door 98 Five, six, pick up sticks 97 Seven, eight, lay them straight 96 Nine, ten, a big fat hen 95 One, two, buckle my shoe 94 Three, four, shut the door 93 Five, six, pick up sticks 92 Seven, eight, lay them straight 91 Nine, ten, a big fat hen File-Based Data Structures | 121
Slide 146: Sorting and merging SequenceFiles The most powerful way of sorting (and merging) one or more sequence files is to use MapReduce. MapReduce is inherently parallel and will let you specify the number of reducers to use, which determines the number of output partitions. For example, by specifying one reducer, you get a single output file. We can use the sort example that comes with Hadoop by specifying that the input and output are sequence files, and by setting the key and value types: % hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \ -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \ -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \ -outKey org.apache.hadoop.io.IntWritable \ -outValue org.apache.hadoop.io.Text \ numbers.seq sorted % hadoop fs -text sorted/part-00000 | head 1 Nine, ten, a big fat hen 2 Seven, eight, lay them straight 3 Five, six, pick up sticks 4 Three, four, shut the door 5 One, two, buckle my shoe 6 Nine, ten, a big fat hen 7 Seven, eight, lay them straight 8 Five, six, pick up sticks 9 Three, four, shut the door 10 One, two, buckle my shoe Sorting is covered in more detail in “Sorting” on page 232. As an alternative to using MapReduce for sort/merge, there is a SequenceFile.Sorter class that has a number of sort() and merge() methods. These functions predate MapReduce and are lower-level functions than MapReduce (for example, to get parallelism, you need to partition your data manually), so in general MapReduce is the preferred approach to sort and merge sequence files. The SequenceFile format A sequence file consists of a header followed by one or more records (see Figure 4-2). The first three bytes of a sequence file are the bytes SEQ, which acts a magic number, followed by a single byte representing the version number. The header contains other fields including the names of the key and value classes, compression details, userdefined metadata, and the sync marker.† Recall that the sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file. They are designed to incur less than a 1% storage overhead, so they don’t necessarily appear between every pair of records (such is the case for short records). † Full details of the format of these fields may be found in SequenceFile’s documentation and source code. 122 | Chapter 4: Hadoop I/O
Slide 147: Figure 4-2. The internal structure of a sequence file with no compression and record compression The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression. If no compression is enabled (the default), then each record is made up of the record length (in bytes), the key length, the key, and then the value. The length fields are written as four-byte integers adhering to the contract of the writeInt() method of java.io.DataOutput. Keys and values are serialized using the Serialization defined for the class being written to the sequence file. The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed. Block compression compresses multiple records at once; it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. (See Figure 4-3.) Records are added to a block until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize property: the default is 1 million bytes. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values. MapFile A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory. File-Based Data Structures | 123
Slide 148: Figure 4-3. The internal structure of a sequence file with block compression Writing a MapFile Writing a MapFile is similar to writing a SequenceFile: you create an instance of MapFile.Writer, then call the append() method to add entries in order. (Attempting to add entries out of order will result in an IOException.) Keys must be instances of WritableComparable, and values must be Writable—contrast this to SequenceFile, which can use any serialization framework for its entries. The program in Example 4-14 creates a MapFile, and writes some entries to it. It is very similar to the program in Example 4-12 for creating a SequenceFile. Example 4-14. Writing a MapFile public class MapFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); IntWritable key = new IntWritable(); Text value = new Text(); MapFile.Writer writer = null; try { writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass()); for (int i = 0; i < 1024; i++) { key.set(i + 1); value.set(DATA[i % DATA.length]); 124 | Chapter 4: Hadoop I/O
Slide 149: } } writer.append(key, value); } } finally { IOUtils.closeStream(writer); } Let’s use this program to build a MapFile: % hadoop MapFileWriteDemo numbers.map If we look at the MapFile, we see it’s actually a directory containing two files called data and index: % ls -l numbers.map total 104 -rw-r--r-1 tom tom -rw-r--r-1 tom tom 47898 Jul 29 22:06 data 251 Jul 29 22:06 index Both files are SequenceFiles. The data file contains all of the entries, in order: % hadoop fs -text numbers.map/data | head 1 One, two, buckle my shoe 2 Three, four, shut the door 3 Five, six, pick up sticks 4 Seven, eight, lay them straight 5 Nine, ten, a big fat hen 6 One, two, buckle my shoe 7 Three, four, shut the door 8 Five, six, pick up sticks 9 Seven, eight, lay them straight 10 Nine, ten, a big fat hen The index file contains a fraction of the keys, and contains a mapping from the key to that key’s offset in the data file: % hadoop fs -text numbers.map/index 1 128 129 6079 257 12054 385 18030 513 24002 641 29976 769 35947 897 41922 As we can see from the output, by default only every 128th key is included in the index, although you can change this value either by setting the io.map.index.interval property or by calling the setIndexInterval() method on the MapFile.Writer instance. A reason to increase the index interval would be to decrease the amount of memory that the MapFile needs to store the index. Conversely, you might decrease the interval to improve the time for random selection (since fewer records need to be skipped on average) at the expense of memory usage. File-Based Data Structures | 125
Slide 150: Since the index is only a partial index of keys, MapFile is not able to provide methods to enumerate, or even count, all the keys it contains. The only way to perform these operations is to read the whole file. Reading a MapFile Iterating through the entries in order in a MapFile is similar to the procedure for a SequenceFile: you create a MapFile.Reader, then call the next() method until it returns false, signifying that no entry was read because the end of the file was reached: public boolean next(WritableComparable key, Writable val) throws IOException A random access lookup can be performed by calling the get() method: public Writable get(WritableComparable key, Writable val) throws IOException The return value is used to determine if an entry was found in the MapFile; if it’s null, then no value exists for the given key. If key was found, then the value for that key is read into val, as well as being returned from the method call. It might be helpful to understand how this is implemented. Here is a snippet of code that retrieves an entry for the MapFile we created in the previous section: Text value = new Text(); reader.get(new IntWritable(496), value); assertThat(value.toString(), is("One, two, buckle my shoe")); For this operation, the MapFile.Reader reads the index file into memory (this is cached so that subsequent random access calls will use the same in-memory index). The reader then performs a binary search on the in-memory index to find the key in the index that is less than or equal to the search key, 496. In this example, the index key found is 385, with value 18030, which is the offset in the data file. Next the reader seeks to this offset in the data file and reads entries until the key is greater than or equal to the search key, 496. In this case, a match is found and the value is read from the data file. Overall, a lookup takes a single disk seek and a scan through up to 128 entries on disk. For a random-access read, this is actually very efficient. The getClosest() method is like get() except it returns the “closest” match to the specified key, rather than returning null on no match. More precisely, if the MapFile contains the specified key, then that is the entry returned; otherwise, the key in the MapFile that is immediately after (or before, according to a boolean argument) the specified key is returned. A very large MapFile’s index can take up a lot of memory. Rather than reindex to change the index interval, it is possible to load only a fraction of the index keys into memory when reading the MapFile by setting the io.map.index.skip property. This property is normally 0, which means no index keys are skipped; a value of 1 means skip one key for every key in the index (so every other key ends up in the index), 2 means skip two keys for every key in the index (so one third of the keys end up in the index), and so 126 | Chapter 4: Hadoop I/O

   
Time on Slide Time on Plick
Slides per Visit Slide Views Views by Location