Sunday, February 20, 2011

Increase your Swap partition

Build error with Mahout


Interestingly enough, with as low as 1GB for the RAM and Swap sizes, you should run into the exact same issue during the build of Mahout project from source than the one described in this previous post, HBase memory issue.

$ svn co http://svn.apache.org/repos/asf/mahout/trunk mahout
$ cd mahout
$ mvn

From core/target/surefire-reports/org.apache.mahout.cf.taste.hadoop.item.RecommenderJobTest.txt, I had the same error:


-------------------------------------------------------------------------------
Test set: org.apache.mahout.cf.taste.hadoop.item.RecommenderJobTest
-------------------------------------------------------------------------------
Tests run: 21, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 33.882 sec <<< FAILURE!
testCompleteJobBoolean(org.apache.mahout.cf.taste.hadoop.item.RecommenderJobTest)  Time elapsed: 15.717 sec  <<< ERROR!
java.io.IOException: Cannot run program "chmod": java.io.IOException: error=12, Cannot allocate memory
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:149)
        at org.apache.hadoop.util.Shell.run(Shell.java:134)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:286)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:354)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:337)
        at org.apache.hadoop.fs.RawLocalFileSystem.execCommand(RawLocalFileSystem.java:481)
        at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:473)
        at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:280)
        at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:266)
        at org.apache.hadoop.mapred.JobClient.configureCommandLineOptions(JobClient.java:573)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:761)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
        at org.apache.mahout.cf.taste.hadoop.item.RecommenderJob.run(RecommenderJob.java:234)



As mentioned here, you should increase the size of your swap partition. The swap partition was not big enough for the system to fork the java process while simultaneously keeping the memory pages required by the other running applications.

Increase your Swap partition



To get more swap, you can create a swap file. A cleaner way is to change your partition table, shrink one partition to get some space and then increase the swap one. You want to know what you are doing here. Read carefully the Partition guide and back-up your personal files before changing anything. In my case, I got lucky as I was able to shrink an unimportant primary partition (/dev/sda2) to give additional space to the swap (the logical partition /dev/sda8) located in a different primary partition (/dev/sda1). Following is the former partition table that had the same amount of swap than RAM, 1GB. The newer now has twice as much as RAM, 2GB. This was the old partition table:


# fdisk /dev/sda

WARNING: DOS-compatible mode is deprecated. It's strongly recommended to
         switch off the mode (command 'c') and change display units to
         sectors (command 'u').

Command (m for help): p

Disk /dev/sda: 58.5 GB, 58506416640 bytes
255 heads, 63 sectors/track, 7113 cylinders
Units = cylinders of 16065 * 512 = 8225280 bytes
Sector size (logical/physical): 512 bytes / 512 bytes
I/O size (minimum/optimal): 512 bytes / 512 bytes
Disk identifier: 0xcfce28df

   Device Boot      Start         End      Blocks   Id  System
/dev/sda1               1        3649    29310561    5  Extended
/dev/sda2   *        3650        7112    27816547+   7  HPFS/NTFS
/dev/sda5               1         973     7815559+  83  Linux
/dev/sda6             974        1095      979933+  83  Linux
/dev/sda7            1096        3527    19535008+  83  Linux
/dev/sda8            3528        3649      979933+  82  Linux swap / Solaris


/dev/sda8 got assigned the cylinders from 3528 to 3649. Its size is 121 * 8225280 / 1024 = 971 932 kB ( ~ 1 GB)


To perform the modification, I followed these steps:

  1. Count the number of additionnal cylinders required for the swap partition (121)
  2. Run fdisk /dev/sda
  3. Shrink primary partition /dev/sda2 : delete it (d), add it again (n) with less cylinders (starting cylinder is now 3771 instead of 3650) and write the partition table to disk (w).
  4. Run kpartx /dev/sda
  5. Expand extended partition /dev/sda1 : (d) then (n), with more cylinders (ending cylinder is 3770 instead of 3649). Recreate logical partitions /dev/sda5, /dev/sda6, /dev/sda7 ((n) 3 times with the same corresponding start/end cylinders than before) and finally /dev/sda8 with more cylinders (ending cylinder is now 3770 instead of 3649). Assign 82 as system id to /dev/sda8 to tag it as "Linux swap / Solaris" (t). Then (w).
  6. Run kpartx /dev/sda again
  7. Reformat the modified partitions:

Format /dev/sda2 partition with ext3 filesytem. You will loose everything.

# mke2fs -j /dev/sda2

Format /dev/sda8 as swap:

# swapoff /dev/sda8
# mkswap -f /dev/sda8
# swapon /dev/sda8


This is the new partition table:


   Device Boot      Start         End      Blocks   Id  System
/dev/sda1               1        3770    30282493+   5  Extended
/dev/sda2            3771        7113    26852647+  83  Linux
/dev/sda5               1         973     7815559+  83  Linux
/dev/sda6             974        1095      979933+  83  Linux
/dev/sda7            1096        3527    19535008+  83  Linux
/dev/sda8            3528        3770     1951866   82  Linux swap / Solaris




With the top command, you should see the new total amount of swap available.


top - 09:45:16 up  1:48,  6 users,  load average: 0.01, 0.04, 0.01
Tasks: 139 total,   1 running, 138 sleeping,   0 stopped,   0 zombie
Cpu(s):  1.0%us,  1.0%sy,  0.0%ni, 96.8%id,  0.0%wa,  0.7%hi,  0.5%si,  0.0%st
Mem:   1026468k total,   668960k used,   357508k free,    22036k buffers
Swap:  1951856k total,   168876k used,  1782980k free,   438632k cached


Conclusion


Run the build in Mahout directory with mvn command. It should now be successful.

Sunday, February 6, 2011

Gora, an ORM framework for Hadoop jobs

Table of Contents
Introduction
Background
Gora development in Eclipse
I/O Frequency
Cassandra in Gora
  gora-cassandra module
  Avro schema
References



Introduction


Lately I have been focusing on rewriting the whole Cassandra stack in Gora, for GORA-22. It was hinted that it needed to be revamped due to some concurrency issues? I tried to port the old backend by updating the API calls to make it compatible with Cassandra 0.8. I could not make it work, so I just rewrote the module from scratch. Instead of rewriting the entire Thrift layer, we now delegate Cassandra Read/Write operations to Hector, the first Cassandra client listed on the official wiki.

Background


Here is some Gora background, from what I understand.

Nutch performs a web crawl by running iterations via generate/fetch/parse/updatedb steps implemented through Hadoop jobs. To access the data, it relies on Gora, which is actually an ORM framework (Object-Relationnal Mapping, a bit like activerecord in Rails), instead of previously manipulating segments.

The gora-mapreduce module intends to abstract away the data access within Map/Reduce. It replaces the data storage that is usually done through HDFS files (hadoop-hdfs). Instead, you're given the ability to query your data from a database, the row-oriented (RDBMS such as MySQL, HSQL) or column-oriented (no-SQL dbs such as HBase or Cassandra) style.

This of course has impacts on performance by adding network overhead when the mappers need to connect to a centralized remote server instead of reading distributed files from the cluster. It kills as well a few intrinsic HDFS features, such as data recovery through replication (connection failures) or speedup through network topology analysis. Here I'm not quite aware of the implications of using Gora so please don't hesitate to share your own impressions.


Gora development in Eclipse


Setup a Gora project the same way that Nutch.
To resolve dependencies, we can use the maven m2eclipse or the ivyde plugin.


If you want to use IvyDE, you should replace every occurence of ${project.dir} by ${basedir}/trunk (assuming trunk is the local directory containing Gora checkout) as well as commenting the gora-core dependency in gora-cassandra/ivy/ivy.xml, to make it work in Eclipse. I have no idea how to load such project.dir property in IvyDE Eclipse plugin.

Index: gora-core/ivy/ivy.xml
===================================================================
--- gora-core/ivy/ivy.xml       (revision 1149427)
+++ gora-core/ivy/ivy.xml       (working copy)
@@ -23,7 +23,7 @@
       status="integration"/>
 
   <configurations>
-    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+    <include file="${basedir}/trunk/ivy/ivy-configurations.xml"/>
   </configurations>
 
   <publications defaultconf="compile">
@@ -44,10 +44,11 @@
       <exclude org="org.eclipse.jdt" name="core"/>
       <exclude org="org.mortbay.jetty" name="jsp-*"/>
     </dependency>
+    
     <dependency org="org.apache.hadoop" name="avro" rev="1.3.2" conf="*->default">
       <exclude org="ant" name="ant"/>
     </dependency>
-
+    
     <!-- test dependencies -->
     <dependency org="org.apache.hadoop" name="hadoop-test" rev="0.20.2" conf="test->master"/>
     <dependency org="org.slf4j" name="slf4j-simple" rev="1.5.8" conf="test -> *,!sources,!javadoc"/>
Index: gora-sql/ivy/ivy.xml
===================================================================
--- gora-sql/ivy/ivy.xml        (revision 1149427)
+++ gora-sql/ivy/ivy.xml        (working copy)
@@ -23,7 +23,7 @@
       status="integration"/>
 
   <configurations>
-    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+    <include file="${basedir}/trunk/ivy/ivy-configurations.xml"/>
   </configurations>
   
   <publications>
@@ -33,12 +33,13 @@
 
   <dependencies>
     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
-    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/> 
+    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
     <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
     <dependency org="com.healthmarketscience.sqlbuilder" name="sqlbuilder" rev="2.0.6" conf="*->default"/>
 
     <!-- test dependencies -->
     <dependency org="org.hsqldb" name="hsqldb" rev="2.0.0" conf="test->default"/>
+    <dependency org="mysql" name="mysql-connector-java" rev="5.1.13" conf="*->default"/>
 
   </dependencies>
     
Index: gora-cassandra/ivy/ivy.xml
===================================================================
--- gora-cassandra/ivy/ivy.xml  (revision 1149427)
+++ gora-cassandra/ivy/ivy.xml  (working copy)
@@ -24,7 +24,7 @@
       status="integration"/>
       
   <configurations>
-    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+    <include file="${basedir}/trunk/ivy/ivy-configurations.xml"/>
   </configurations>
   
   <publications>
@@ -35,9 +35,9 @@
   
   <dependencies>
     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
-    
+    <!--
     <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
-    
+    -->
     <dependency org="org.jdom" name="jdom" rev="1.1">
        <exclude org="xerces" name="xercesImpl"/>
     </dependency>


This is how the Gora project looks like




In the Nutch project, we want to comment gora-core, gora-hbase, gora-cassandra or gora-mysql dependencies in Ivy since they are already loaded with the Gora project being included in the Nutch Java Build Path. We add Gora as a project dependency in the Java Build Path




That's how we can update Gora code and test it on the fly by running Nutch classes.

I/O Frequency


By default the records shuffled in the Hadoop job are buffered to memory. At some point, you want to flush the buffer and write the records in the actual database. Similarly, you can not load the entire content of the database then start the mappers, so you need to limit the number of records you read at a time.


People might be familiar with mapred-site.xml configuration file when they write a Hadoop job and do not necesarilly use Gora together with Nutch. You can overwrite the default number of rows fetched per select query for read operations and the default number of rows buffered into memory before it gets flushed for write operations. Default is 10000.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
  <name>gora.buffer.read.limit</name>
  <value>500</value>
 </property> 
 <property>
  <name>gora.buffer.write.limit</name>
  <value>100</value>
 </property>
</configuration>



Cassandra in Gora


A Gora Hadoop job just fetches or emits key/value pairs to process the data. The task performed by gora-cassandra module is to the write the values, coming as specific instances of Avro records then store them to Cassandra. So this subproject is just a thin layer between Gora Hadoop job code and the Cassandra client, which itself is interacting with the Cassandra server. That's it.

gora-cassandra module


Here are the main changes/improvements:

  • Compatibility with Cassandra 0.8
  • Use Hector as Cassandra client
  • Concurrency now relies on Hector.
  • Not all the features have not yet been implemented: delete query...

Avro schema


The object serialization is dictated by an Avro schema. An example can be found in $NUTCH_HOME/src/gora/webpage.avsc:


{"name": "WebPage",
 "type": "record",
 "namespace": "org.apache.nutch.storage",
 "fields": [
        {"name": "baseUrl", "type": "string"}, 
        {"name": "status", "type": "int"},
        {"name": "fetchTime", "type": "long"},
        {"name": "prevFetchTime", "type": "long"},
        {"name": "fetchInterval", "type": "int"},
        {"name": "retriesSinceFetch", "type": "int"},
        {"name": "modifiedTime", "type": "long"},
        {"name": "protocolStatus", "type": {
            "name": "ProtocolStatus",
            "type": "record",
            "namespace": "org.apache.nutch.storage",
            "fields": [
                {"name": "code", "type": "int"},
                {"name": "args", "type": {"type": "array", "items": "string"}},
                {"name": "lastModified", "type": "long"}
            ]
            }},
        {"name": "content", "type": "bytes"},
        {"name": "contentType", "type": "string"},
        {"name": "prevSignature", "type": "bytes"},
        {"name": "signature", "type": "bytes"},
        {"name": "title", "type": "string"},
        {"name": "text", "type": "string"},
        {"name": "parseStatus", "type": {
            "name": "ParseStatus",
            "type": "record",
            "namespace": "org.apache.nutch.storage",
            "fields": [
                {"name": "majorCode", "type": "int"},
                {"name": "minorCode", "type": "int"},
                {"name": "args", "type": {"type": "array", "items": "string"}}
            ]
            }},
        {"name": "score", "type": "float"},
        {"name": "reprUrl", "type": "string"},
        {"name": "headers", "type": {"type": "map", "values": "string"}},
        {"name": "outlinks", "type": {"type": "map", "values": "string"}},
        {"name": "inlinks", "type": {"type": "map", "values": "string"}},
        {"name": "markers", "type": {"type": "map", "values": "string"}},
        {"name": "metadata", "type": {"type": "map", "values": "bytes"}}
   ]
}


The schema is hardcoded in the Nutch class org.apache.nutch.storage.WebPage, which is a POJO (Plain Old Java Object?) that contains the logics used for crawling a web page. It extends org.apache.gora.persistency.impl.PersistentBase. gora-cassandra considers the PersistentBase class as a base class for the RECORD fields. It considers org.apache.gora.persistency.StatefulHashMap as a base class for the MAP fields.

The 2 complex types, MAP and RECORD, are represented in Cassandra by "super columns", which are maps, ie sets of key/value pairs. The ARRAY type is represented by a simple column, via a coma separated list bounded by square brackets, like "[one, two, three]". Not the best.



References