Friday, December 17, 2010

Build Nutch 2.0

Testing Nutch 2.0 under Eclipse



Table of Contents
Introduction
Setup the projects in Eclipse
  Install plugins
  Check out SVN directories
  Build the projects
Nutch
    Datastores
      HSQL
      MySQL
      HBase
      Cassandra
    JUnit Tests
      Datastore
      Fetch
    Nutch Commands
      Running Nutch classes from Eclipse
      crawl
      readdb
      inject
      generate
      fetch
      parse
      updatedb
      solrindex
Crawl script
Conclusion


Introduction


     This is a guide on setting up Nutch 2 in an Eclipse project. You will then be able to hack the code and run tests, especially JUnit ones, pretty easily.

If you have Nutch branch or Gora trunk already checked-out, please run
$ svn up

in order to have its most up-to-date version. Multiple fixes show up in this document through diff outputs. In order to apply the fix, place the diff content into the root directory and run the patch command

$ patch -p0 < myDiff


Setup the projects in Eclipse


     The idea is to be able to improve Nutch and Gora code comfortably, with the help of the Eclipse IDE. You want to add in the Java Build Path the source - and why not the test - directories of the modules you are interested in working on. Then manage the dependencies with Ivy or Maven plugins to resolve the external libraries. Then update the code. Optionally run a few JUnit tests. Then run the ant task that builds the project. Then submit a patch. This is the easiest and the fastest way, especially as regards productivity.


Install plugins

Install the Subclipse, IvyDE and m2e plugins if you don't have them yet.
Help > Install New Software ...

Add the following urls:








Check out SVN directories

Check-out Nutch branch and Gora trunk versions using the SVN wizard, with the following urls

File > New > Project ...












Note that you can just create a Java project and check out Nutch source with svn command, if you don't like SVN Eclipse plugin:

$ cd ~/java/workspace/NutchGora
$ svn co http://svn.apache.org/repos/asf/nutch/branches/nutchgora branch

Build the projects

Window > Show View > Ant
Drag and drop the build.xml files in the Ant Eclipse tab.
Just double click on the Gora and Nutch items in the Ant view. That will run the default task. For Gora, it will publish the modules to the Ivy local repository. For Nutch, it will build a "release" in runtime/local directory





Nutch


Within the Nutch project, we want to manage the dependencies with the Ivy plugin, not the Maven one.

  • The call to "nutch.root" property set in build.xml for ant should be replaced in src/plugin/protocol-sftp/ivy.xml by the built-in "basedir" ivy property. I am not sure how to load a property in Eclipse Ivy plugin. This will break the build, so be sure to replace it back when running Ant tasks.

This is the Ivy configuration tweaks:

Index: src/plugin/protocol-sftp/ivy.xml
===================================================================
--- src/plugin/protocol-sftp/ivy.xml    (revision 1177967)
+++ src/plugin/protocol-sftp/ivy.xml    (working copy)
@@ -27,7 +27,7 @@
   </info>
 
   <configurations>
-    <include file="${nutch.root}/ivy/ivy-configurations.xml"/>
+    <include file="${basedir}/branch/ivy/ivy-configurations.xml"/>
   </configurations>
 
   <publications>
Index: ivy/ivy.xml
===================================================================
--- ivy/ivy.xml (revision 1177967)
+++ ivy/ivy.xml (working copy)
@@ -21,7 +21,7 @@
        </info>
 
        <configurations>
-               <include file="${basedir}/ivy/ivy-configurations.xml" />
+               <include file="${basedir}/branch/ivy/ivy-configurations.xml" />
        </configurations>
 
        <publications>
@@ -58,8 +58,9 @@
                  <dependency org="org.apache.tika" name="tika-parsers" rev="0.9" />
                -->
 
+               <!--
                <dependency org="org.apache.gora" name="gora-core" rev="0.1.1-incubating" conf="*->compile"/>
-
+-->
                <dependency org="log4j" name="log4j" rev="1.2.15" conf="*->master" />
 
                <dependency org="xerces" name="xercesImpl" rev="2.9.1" />

Now, right click on ivy/ivy.xml, "Add Ivy Library ...". Do the same for src/plugin/protocol-sftp/ivy.xml.














Remove the default src directory as a Source entry in the Java Build Path if it exists. Add at least the "java", "test" and "resources" source files (which are src/java, src/test and conf) so that they get included in the classpath. That will allow us to run the classes or tests later from Eclipse.



This is how the project tree looks like:





Datastores

The datastore holds the information Nutch crawls from the web. You can opt for a Relational DataBase System or a column-oriented, NoSQL store. Thanks to Gora interface, you can use any backend you might be familiar with: HSQL, MySQL, HBase, Cassandra ...


HSQL


This is the default Gora backend. Make sure your Ivy configuration contains the dependency:
<dependency org="org.hsqldb" name="hsqldb" rev="2.0.0" conf="*->default"/>

This is the content of conf/gora.properties:

gora.sqlstore.jdbc.driver=org.hsqldb.jdbcDriver
gora.sqlstore.jdbc.url=jdbc:hsqldb:hsql://localhost/nutchtest
gora.sqlstore.jdbc.user=sa
gora.sqlstore.jdbc.password=


Setup HSQL. I downloaded this version: HSQLDB 2.0.0. Finally starts HSQL server with the same database alias, called "nutchtest":

~/java/ext/hsqldb-2.0.0/hsqldb/data$ java -cp ../lib/hsqldb.jar org.hsqldb.server.Server --database.0 file:crawldb --dbname.0 nutchtest
[Server@12ac982]: [Thread[main,5,main]]: checkRunning(false) entered
[Server@12ac982]: [Thread[main,5,main]]: checkRunning(false) exited
[Server@12ac982]: Startup sequence initiated from main() method
[Server@12ac982]: Loaded properties from [/home/alex/java/ext/hsqldb-2.0.0/hsqldb/data/server.properties]
[Server@12ac982]: Initiating startup sequence...
[Server@12ac982]: Server socket opened successfully in 5 ms.
[Server@12ac982]: Database [index=0, id=0, db=file:crawldb, alias=nutchtest] opened sucessfully in 420 ms.
[Server@12ac982]: Startup sequence completed in 426 ms.
[Server@12ac982]: 2011-01-12 10:41:56.181 HSQLDB server 2.0.0 is online on port 9001
[Server@12ac982]: To close normally, connect and execute SHUTDOWN SQL
[Server@12ac982]: From command line, use [Ctrl]+[C] to abort abruptly


MySQL

To use MySQL as datastore, add the dependency to ivy/ivy.xml:

<dependency org="mysql" name="mysql-connector-java" rev="5.1.13" conf="*->default"></dependency>


Change conf/gora.properties to setup the MySQL connection:

gora.sqlstore.jdbc.driver=com.mysql.jdbc.Driver
gora.sqlstore.jdbc.url=jdbc:mysql://localhost:3306/nutch?createDatabaseIfNotExist=true
gora.sqlstore.jdbc.user=alex
gora.sqlstore.jdbc.password=some_pass

You will notice MySQL is a lot faster than HSQL. It was at least 12x in my case with the default setups. For example injecting 50k took 6 min with HSQL instead of 30 sec with MySQL. You could make a similar comparison in Rails with SQLite and MySQL ...


HBase


Add the gora-hbase and zookeeper dependency in ivy/ivy.xml

<dependency org="org.apache.gora" name="gora-hbase" rev="0.1" conf="*->compile">
  <exclude org="com.sun.jdmk"/>
  <exclude org="com.sun.jmx"/>
  <exclude org="javax.jms"/>
 </dependency>
 <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.3.2" conf="*->default">
  <exclude org="com.sun.jdmk"/>
  <exclude org="com.sun.jmx"/>
  <exclude org="javax.jms"/>        
 </dependency>  

After rebuilding Nutch, since a recent version (0.20.6 ?) is not available in the Maven repository, you will need to manually add hbase jar to the runtime/local/lib directory :

$ cp $HBASE_HOME/hbase-*.jar $NUTCH_HOME/runtime/local/lib

Create the $NUTCH_HOME/runtime/local/conf/gora-hbase-mapping.xml file as described in GORA_HBase wiki page. Overwrite storage.data.store.class property in runtime/local/conf/nutch-site.xml:

<property>
  <name>storage.data.store.class</name>
  <value>org.apache.gora.hbase.store.HBaseStore</value>
 </property>

Finally setup and Run HBase. See this blog entry as a quick guide.


Cassandra


Start Cassandra:

$ bin/cassandra -f


Run the injector job with the Gora setting to Cassandra store in nutch-site.xml.

<property>
  <name>storage.data.store.class</name>
  <value>org.apache.gora.cassandra.store.CassandraStore</value>
  <description>Default class for storing data</description>
</property>

Please place gora-cassandra-mapping.xml in your Nutch conf directory which is included in the classpath. This configuration defines how Avro fields are stored in Cassandra. This the content of gora-cassandra-mapping.xml file:
<gora-orm>
 <keyspace name="webpage" cluster="Test Cluster" host="localhost">
  <family name="p"/>
  <family name="f"/>
  <family name="sc" type="super"/>
 </keyspace>
 <class keyClass="java.lang.String" name="org.apache.nutch.storage.WebPage">
  <!-- fetch fields -->
  <field name="baseUrl" family="f" qualifier="bas"/>
  <field name="status" family="f" qualifier="st"/>
  <field name="prevFetchTime" family="f" qualifier="pts"/>
  <field name="fetchTime" family="f" qualifier="ts"/>
  <field name="fetchInterval" family="f" qualifier="fi"/>
  <field name="retriesSinceFetch" family="f" qualifier="rsf"/>
  <field name="reprUrl" family="f" qualifier="rpr"/>
  <field name="content" family="f" qualifier="cnt"/>
  <field name="contentType" family="f" qualifier="typ"/>
  <field name="modifiedTime" family="f" qualifier="mod"/>
  <!-- parse fields -->
  <field name="title" family="p" qualifier="t"/>
  <field name="text" family="p" qualifier="c"/>
  <field name="signature" family="p" qualifier="sig"/>
  <field name="prevSignature" family="p" qualifier="psig"/>
  <!-- score fields -->
  <field name="score" family="f" qualifier="s"/>
  <!-- super columns -->
  <field name="markers" family="sc" qualifier="mk"/>
  <field name="inlinks" family="sc" qualifier="il"/>
  <field name="outlinks" family="sc" qualifier="ol"/>
  <field name="metadata" family="sc" qualifier="mtdt"/>
  <field name="headers" family="sc" qualifier="h"/>
  <field name="parseStatus" family="sc" qualifier="pas"/>
  <field name="protocolStatus" family="sc" qualifier="prs"/>
 </class>
</gora-orm>




Check the data that has been initialized and gets populated.


$ bin/cassandra-cli --host localhost
[default@unknown] use webpage;
Authenticated to keyspace: webpage
[default@webpage] update column family p with key_validation_class=UTF8Type;                                                                                                       
138c3060-b623-11e0-0000-242d50cf1fb7
Waiting for schema agreement...
... schemas agree across the cluster
[default@webpage] update column family f with key_validation_class=UTF8Type;
139de3a0-b623-11e0-0000-242d50cf1fb7
Waiting for schema agreement...
... schemas agree across the cluster
[default@webpage] update column family sc with key_validation_class=UTF8Type;
13b2f240-b623-11e0-0000-242d50cf1fb7
Waiting for schema agreement...
... schemas agree across the cluster
[default@webpage] list f;
Using default limit of 100
-------------------
RowKey: com.truveo.www:http/
=> (column=fi, value=2592000, timestamp=1311532210076000)
=> (column=s, value=1.0, timestamp=1311532210080000)
=> (column=ts, value=1311532203790, timestamp=1311532209796000)
-------------------
RowKey: com.blogspot.techvineyard:http/
=> (column=fi, value=2592000, timestamp=1311532210134000)
=> (column=s, value=1.0, timestamp=1311532210137000)
=> (column=ts, value=1311532203790, timestamp=1311532210131000)
-------------------
RowKey: org.apache.wiki:http/nutch/
=> (column=fi, value=2592000, timestamp=1311532210146000)
=> (column=s, value=1.0, timestamp=1311532210149000)
=> (column=ts, value=1311532203790, timestamp=1311532210144000)

3 Rows Returned.
[default@webpage] 



JUnit Tests


Let's run a few unit tests to verify the setup.


Datastore


We want to make the GoraStorage test pass. First apply this patch to test your datastore setting and avoid crashing your old laptop because it has limited capacity.




Index: src/test/org/apache/nutch/storage/TestGoraStorage.java
===================================================================
--- src/test/org/apache/nutch/storage/TestGoraStorage.java      (revision 1053817)
+++ src/test/org/apache/nutch/storage/TestGoraStorage.java      (working copy)
@@ -1,22 +1,17 @@
 package org.apache.nutch.storage;
 
-import java.io.File;
-import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Random;
-import java.util.Vector;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import junit.framework.TestCase;
+
 import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.NutchConfiguration;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.NutchConfiguration;
 
-import junit.framework.TestCase;
-
 public class TestGoraStorage extends TestCase {
   Configuration conf;
   
@@ -80,8 +75,8 @@
   private AtomicInteger threadCount = new AtomicInteger(0);
   
   public void testMultithread() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
+    int COUNT = 50;
+    int NUM = 5;
     DataStore<String,WebPage> store;
     
     for (int i = 0; i < NUM; i++) {
@@ -113,115 +108,4 @@
     assertEquals(size, keys.cardinality());
   }
   
-  public void testMultiProcess() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
-    DataStore<String,WebPage> store;
-    List<Process> procs = new ArrayList<Process>();
-    
-    for (int i = 0; i < NUM; i++) {
-      Process p = launch(i, i * COUNT, COUNT);
-      procs.add(p);
-    }
-    
-    while (procs.size() > 0) {
-      try {
-        Thread.sleep(5000);
-      } catch (Exception e) {};
-      Iterator<Process> it = procs.iterator();
-      while (it.hasNext()) {
-        Process p = it.next();
-        int code = 1;
-        try {
-          code = p.exitValue();
-          assertEquals(0, code);
-          it.remove();
-          p.destroy();
-        } catch (IllegalThreadStateException e) {
-          // not ready yet
-        }
-      }
-      System.out.println("* running " + procs.size() + "/" + NUM);
-    }
-    System.out.println("Verifying...");
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    Result<String,WebPage> res = store.execute(store.newQuery());
-    int size = COUNT * NUM;
-    BitSet keys = new BitSet(size);
-    while (res.next()) {
-      String key = res.getKey();
-      WebPage p = res.get();
-      assertEquals(key, p.getTitle().toString());
-      int pos = Integer.parseInt(key);
-      assertTrue(pos < size && pos >= 0);
-      if (keys.get(pos)) {
-        fail("key " + key + " already set!");
-      }
-      keys.set(pos);
-    }
-    if (size != keys.cardinality()) {
-      System.out.println("ERROR Missing keys:");
-      for (int i = 0; i < size; i++) {
-        if (keys.get(i)) continue;
-        System.out.println(" " + i);
-      }
-      fail("key count should be " + size + " but is " + keys.cardinality());
-    }
-  }
-  
-  private Process launch(int id, int start, int count) throws Exception {
-    //  Build exec child jmv args.
-    Vector<String> vargs = new Vector<String>(8);
-    File jvm =                                  // use same jvm as parent
-      new File(new File(System.getProperty("java.home"), "bin"), "java");
-
-    vargs.add(jvm.toString());
-
-    // Add child (task) java-vm options.
-    // tmp dir
-    String prop = System.getProperty("java.io.tmpdir");
-    vargs.add("-Djava.io.tmpdir=" + prop);
-    // library path
-    prop = System.getProperty("java.library.path");
-    if (prop != null) {
-      vargs.add("-Djava.library.path=" + prop);      
-    }
-    // working dir
-    prop = System.getProperty("user.dir");
-    vargs.add("-Duser.dir=" + prop);    
-    // combat the stupid Xerces issue
-    vargs.add("-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
-    // prepare classpath
-    String sep = System.getProperty("path.separator");
-    StringBuffer classPath = new StringBuffer();
-    // start with same classpath as parent process
-    classPath.append(System.getProperty("java.class.path"));
-    //classPath.append(sep);
-    // Add classpath.
-    vargs.add("-classpath");
-    vargs.add(classPath.toString());
-    
-    // append class name and args
-    vargs.add(TestGoraStorage.class.getName());
-    vargs.add(String.valueOf(id));
-    vargs.add(String.valueOf(start));
-    vargs.add(String.valueOf(count));
-    ProcessBuilder builder = new ProcessBuilder(vargs);
-    return builder.start();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      System.err.println("Usage: TestGoraStore <id> <startKey> <numRecords>");
-      System.exit(-1);
-    }
-    TestGoraStorage test = new TestGoraStorage();
-    test.init();
-    int id = Integer.parseInt(args[0]);
-    int start = Integer.parseInt(args[1]);
-    int count = Integer.parseInt(args[2]);
-    Worker w = test.new Worker(id, start, count, true);
-    w.run();
-    System.exit(0);
-  }
 }
Index: src/test/org/apache/nutch/util/AbstractNutchTest.java
===================================================================
--- src/test/org/apache/nutch/util/AbstractNutchTest.java       (revision 1053817)
+++ src/test/org/apache/nutch/util/AbstractNutchTest.java       (working copy)
@@ -16,28 +16,14 @@
  */
 package org.apache.nutch.util;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
 import junit.framework.TestCase;
 
-import org.apache.avro.util.Utf8;
+import org.apache.gora.store.DataStore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.nutch.crawl.URLWebPage;
-import org.apache.nutch.storage.Mark;
 import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
-import org.apache.nutch.util.TableUtil;
-import org.apache.gora.query.Query;
-import org.apache.gora.query.Result;
-import org.apache.gora.sql.store.SqlStore;
-import org.apache.gora.store.DataStore;
-import org.apache.gora.store.DataStoreFactory;
-import org.apache.gora.util.ByteUtils;
 
 /**
  * This class provides common routines for setup/teardown of an in-memory data
@@ -55,16 +41,12 @@
   public void setUp() throws Exception {
     super.setUp();
     conf = CrawlTestUtil.createConfiguration();
-    conf.set("storage.data.store.class", "org.gora.sql.store.SqlStore");
     fs = FileSystem.get(conf);
-    // using hsqldb in memory
-    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
-    // use separate in-memory db-s for tests
-    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url","jdbc:hsqldb:mem:" + getClass().getName());
-    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
-    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
     webPageStore = StorageUtils.createWebStore(conf, String.class,
         WebPage.class);
+    
+    // empty the datastore
+    webPageStore.deleteByQuery(webPageStore.newQuery());
   }
 
   @Override












Fetch



We can try to run the Fetcher test as well.

  • Change the location of the static files that will be returned to the Nutch crawler by the Jetty server, from "build/test/data/fetch-test-site" to "src/testresources/fetch-test-site"
  • Overwrite as well for testing purpose the plugin directory setting.
  • Set http.agent.name and http.robots.agents properties.
  • Limit the content length to the maximum for a blob column type. This is only required for MySQL.


Index: src/test/nutch-site.xml               
===================================================================
--- src/test/nutch-site.xml     (revision 1053817)
+++ src/test/nutch-site.xml     (working copy)
@@ -22,4 +22,20 @@
<description>Default class for storing data</description>
 </property>
 
+       <property>
+         <name>plugin.folders</name>
+         <value>build/plugins</value>
+       </property>
+       <property>
+         <name>http.agent.name</name>
+         <value>NutchRobot</value>
+       </property>
+       <property>
+         <name>http.robots.agents</name>
+         <value>NutchRobot,*</value>
+       </property>
+       <property>
+         <name>http.content.limit</name>
+         <value>65535</value>
+       </property>
 </configuration>
Index: src/test/org/apache/nutch/fetcher/TestFetcher.java
===================================================================
--- src/test/org/apache/nutch/fetcher/TestFetcher.java  (revision 1050697)
+++ src/test/org/apache/nutch/fetcher/TestFetcher.java  (working copy)
@@ -50,7 +50,7 @@
   public void setUp() throws Exception{
     super.setUp();
     urlPath = new Path(testdir, "urls");
-    server = CrawlTestUtil.getServer(conf.getInt("content.server.port",50000), "build/test/data/fetch-test-site");
+    server = CrawlTestUtil.getServer(conf.getInt("content.server.port",50000), "src/testresources/fetch-test-site");
     server.start();
   }

Now right click on the org.apache.nutch.fetcher.TestFetcher class located in the src/test source directory, then "Run As" > "JUnit Test".





Nutch Commands


Several commands are available to maintain and index your crawl. Here are the possible options from the Bash script:

~/java/workspace/Nutch2.0/runtime/local$ bin/nutch
Usage: nutch [-core] COMMAND
where COMMAND is one of:
 inject inject new urls into the database
 generate generate new segments to fetch from crawl db
 fetch fetch URLs marked during generate
 parse parse URLs marked during fetch
 updatedb update web table after parsing
 readdb read/dump records from page database
 solrindex run the solr indexer on parsed segments and linkdb
 solrdedup remove duplicates from solr
 plugin load a plugin and run one of its classes main()
 or
 CLASSNAME run the class named CLASSNAME
Most commands print help when invoked w/o parameters.

Expert: -core option is for developers only. It avoids building the job jar, 
 instead it simply includes classes compiled with ant compile-core. 
 NOTE: this works only for jobs executed in 'local' mod

Running Nutch classes from Eclipse


You can either run a command with the Bash script or execute a Nutch class directly from Eclipse. The latter is easier for development since you do not need to build the whole project each time you change something. When a Nutch class is executed, it first loads the configuration by looking in the classpath for a nutch-site.xml file that overwrites nutch-default.xml. Depending on the order of the "src/test" and "conf" source directories in your Eclipse build path, only one nutch-site.xml file will be loaded to the classpath. In my case, it was the one that is located in "src/test". If I edit the one in "conf", I see the warning

The resource is a duplicate of src/test/nutch-default.xml and was not copied to the output folder.

which indicates it will be ignored. So you want to edit the one that is activated.

  • Apply the modifications to src/test/nutch-site.xml (or conf/nutch-site.xml, depending on your classpath order setting) that are given in the Fetch Test section from above.




crawl / org.apache.nutch.crawl.Crawler


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch crawl
Usage: Crawl (<seedDir> | -continue) [-solr <solrURL>] [-threads n] [-depth i] [-topN N]


Right click on org.apache.nutch.crawl.Crawler in src/java source directory. Then "Run As" > "Java Application"

  1. The first argument called "seedDir" is the path to a directory containing lists of seed urls. They will be injected to the database. They define a forest of pages that will be visited by the crawler during the first iteration of the graph exploration. Then the crawler will expand the graph by adding neighbours to these pages when extracting new urls out of the page content. These new pages should then be visited in the second iteration.
  2. The -continue parameter instead resumes the crawl without injecting any seeds.
  3. -solr defines the solr server used to index the documents
  4. -threads defines the number of threads spawned to fetch several pages simultaneously.
  5. -depth defines the number of iterations in the graph exploration, before the traversal gets pruned.
  6. -topN limits the number of urls that get downloaded in one iteration.


Let's create some input to the crawl command. This is the content of a seeds/urls file that we can use for the demo:

http://techvineyard.blogspot.com/
http://www.truveo.com/
http://wiki.apache.org/nutch/

I used MySQL as a datastore. Let's clear it if the webpage table exists before running the crawl command.

$ mysql -hlocalhost -ualex -psome_pass nutch
mysql> delete from webpage;

From the Eclipse menu:

Run > Run Configurations ...





Click Run. You can compare your output with my logs here. Then check the content of the MySQL table:

mysql> describe webpage;
+-------------------+----------------+------+-----+---------+-------+
| Field             | Type           | Null | Key | Default | Extra |
+-------------------+----------------+------+-----+---------+-------+
| id                | varchar(512)   | NO   | PRI | NULL    |       |
| headers           | blob           | YES  |     | NULL    |       |
| text              | varchar(32000) | YES  |     | NULL    |       |
| status            | int(11)        | YES  |     | NULL    |       |
| markers           | blob           | YES  |     | NULL    |       |
| parseStatus       | blob           | YES  |     | NULL    |       |
| modifiedTime      | bigint(20)     | YES  |     | NULL    |       |
| score             | float          | YES  |     | NULL    |       |
| typ               | varchar(32)    | YES  |     | NULL    |       |
| baseUrl           | varchar(512)   | YES  |     | NULL    |       |
| content           | blob           | YES  |     | NULL    |       |
| title             | varchar(512)   | YES  |     | NULL    |       |
| reprUrl           | varchar(512)   | YES  |     | NULL    |       |
| fetchInterval     | int(11)        | YES  |     | NULL    |       |
| prevFetchTime     | bigint(20)     | YES  |     | NULL    |       |
| inlinks           | blob           | YES  |     | NULL    |       |
| prevSignature     | blob           | YES  |     | NULL    |       |
| outlinks          | blob           | YES  |     | NULL    |       |
| fetchTime         | bigint(20)     | YES  |     | NULL    |       |
| retriesSinceFetch | int(11)        | YES  |     | NULL    |       |
| protocolStatus    | blob           | YES  |     | NULL    |       |
| signature         | blob           | YES  |     | NULL    |       |
| metadata          | blob           | YES  |     | NULL    |       |
+-------------------+----------------+------+-----+---------+-------+
23 rows in set (0.14 sec)

mysql> select count(*) from webpage;
+----------+
| count(*) |
+----------+
|      151 |
+----------+
1 row in set (0.00 sec)

mysql> select id, markers from webpage where content is not null;
+---------------------------------+------------------------------------------+
| id                              | markers                                  |
+---------------------------------+------------------------------------------+
| org.apache.wiki:http/nutch/     | _injmrk_y_updmrk_*1294943864-1806760603  |
| com.blogspot.techvineyard:http/ | _injmrk_y_updmrk_*1294943864-1806760603  |
| com.truveo.www:http/            | _injmrk_y_updmrk_*1294943864-1806760603  |
+---------------------------------+------------------------------------------+
3 rows in set (0.00 sec)


readdb / org.apache.nutch.crawl.WebTableReader


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch readdb
Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex]) [-crawlId <id>] [-content] [-headers] [-links] [-text]
        -crawlId <id>    the id to prefix the schemas to operate on, (default: storage.crawl.id)
        -stats [-sort]  print overall statistics to System.out
                [-sort] list status sorted by host
        -url <url>      print information on <url> to System.out
        -dump <out_dir> [-regex regex]  dump the webtable to a text file in <out_dir>
                -content        dump also raw content
                -headers        dump protocol headers
                -links  dump links
                -text   dump extracted text
                [-regex]        filter on the URL of the webtable entry

WebTableReader class scans the entire database via a Hadoop job that outputs all the fields.





inject / org.apache.nutch.crawl.InjectorJob


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch inject
Usage: InjectorJob <url_dir> [-crawlId <id>]

First, we need to initialize the crawl db. The "url_dir" argument to the inject command is a directory containing flat files of lists of urls, used as "seeds".

generate / org.apache.nutch.crawl.GeneratorJob


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch generate
GeneratorJob: Selecting best-scoring urls due for fetch.
GeneratorJob: starting
GeneratorJob: filtering: true
GeneratorJob: done
GeneratorJob: generated batch id: 1294943864-1806760603

This steps generates a batch-id containing selected urls to be fetched.

fetch / org.apache.nutch.fetcher.FetcherJob


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch fetch
Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] [-threads N] [-parse] [-resume] [-numTasks N]
        batchId crawl identifier returned by Generator, or -all for all generated batchId-s
        -crawlId <id>    the id to prefix the schemas to operate on, (default: storage.crawl.id)
        -threads N      number of fetching threads per task
        -parse  if specified then fetcher will immediately parse fetched content
        -resume resume interrupted job
        -numTasks N     if N > 0 then use this many reduce tasks for fetching (default: mapred.map.tasks)


parse / org.apache.nutch.parse.ParserJob


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch parse
Usage: ParserJob (<batchId> | -all) [-crawlId <id>] [-resume] [-force]
        batchId symbolic batch ID created by Generator
        -crawlId <id>    the id to prefix the schemas to operate on, (default: storage.crawl.id)
        -all    consider pages from all crawl jobs
-resume resume a previous incomplete job
-force  force re-parsing even if a page is already parsed

Once we have a local copy the web pages, we need to parse them to extract keywords and links the web page points to. This parsing task is delegated to Tika.

updatedb / org.apache.nutch.crawl.DbUpdaterJob


~/java/workspace/Nutch2.0/runtime/local$ bin/nutch updatedb


solrindex / org.apache.nutch.indexer.solr.SolrIndexerJob


The indexing task is now delegated to Solr, which is a server using Lucene indexes that will make the crawled documents searchable by indexing the data posted via HTTP. I ran into a few caveats before making it work. This is the suggested patch.

  • Avoid multiple values for id field.
  • Allow multiple values for tag field. Add tld (Top Level Domain) field.
  • Get the content-type from WebPage object's member. Otherwise, you will see NullPointerExceptions.
  • Compare strings with equalsTo. That's pretty random, but it avoids having some suprises.

Index: conf/solrindex-mapping.xml
===================================================================
--- conf/solrindex-mapping.xml  (revision 1053817)
+++ conf/solrindex-mapping.xml  (working copy)
@@ -39,8 +39,7 @@
                <field dest="boost" source="boost"/>
                <field dest="digest" source="digest"/>
                <field dest="tstamp" source="tstamp"/>
-               <field dest="id" source="url"/>
-               <copyField source="url" dest="url"/>
+               <field dest="url" source="url"/>
        </fields>
        <uniqueKey>id</uniqueKey>
 </mapping>
Index: conf/schema.xml
===================================================================
--- conf/schema.xml     (revision 1053817)
+++ conf/schema.xml     (working copy)
@@ -95,12 +95,15 @@
 
         <!-- fields for feed plugin -->
         <field name="author" type="string" stored="true" indexed="true"/>
-        <field name="tag" type="string" stored="true" indexed="true"/>
+        <field name="tag" type="string" stored="true" indexed="true" multiValued="true"/>
         <field name="feed" type="string" stored="true" indexed="true"/>
         <field name="publishedDate" type="string" stored="true"
             indexed="true"/>
         <field name="updatedDate" type="string" stored="true"
             indexed="true"/>
+            
+        <field name="tld" type="string" stored="false" indexed="false"/>
+            
     </fields>
     <uniqueKey>id</uniqueKey>
     <defaultSearchField>content</defaultSearchField>
Index: src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java
===================================================================
--- src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java        (revision 1053817)
+++ src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java        (working copy)
@@ -172,7 +172,7 @@
    */
   private NutchDocument addType(NutchDocument doc, WebPage page, String url) {
     MimeType mimeType = null;
-    Utf8 contentType = page.getFromHeaders(new Utf8(HttpHeaders.CONTENT_TYPE));
+    Utf8 contentType = page.getContentType();
     if (contentType == null) {
       // Note by Jerome Charron on 20050415:
       // Content Type not solved by a previous plugin
Index: src/java/org/apache/nutch/indexer/solr/SolrWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrWriter.java      (revision 1053817)
+++ src/java/org/apache/nutch/indexer/solr/SolrWriter.java      (working copy)
@@ -56,7 +56,7 @@
       for (final String val : e.getValue()) {
         inputDoc.addField(solrMapping.mapKey(e.getKey()), val);
         String sCopy = solrMapping.mapCopyKey(e.getKey());
-        if (sCopy != e.getKey()) {
+        if (! sCopy.equals(e.getKey())) {
                inputDoc.addField(sCopy, val);
         }
       }

Download Solr. To setup the Solr server, copy the example directory from the Solr distribution and the patched schema.xml configuration file to solr/conf of the Solr app.

cp -r $SOLR_HOME/example solrapp
 cp $NUTCH_HOME/conf/schema.xml solrapp/solr/conf/            
 cd solrapp
 java -jar start.jar

This starts the Solr server. Now let's index a few documents, by adding as parameter to SolrIndexerJob class the batch id showing up in the markers column.








Here are some excerpts of the logs from the Jetty server to make sure the documents were properly sent:

13-Jan-2011 19:50:47 org.apache.solr.update.processor.LogUpdateProcessor finish
INFO: {add=[com.truveo.www:http/, org.apache.wiki:http/nutch/, com.blogspot.techvineyard:http/]} 0 206
13-Jan-2011 19:50:47 org.apache.solr.core.SolrCore execute
INFO: [] webapp=/solr path=/update params={wt=javabin&version=1} status=0 QTime=206 
13-Jan-2011 19:50:47 org.apache.solr.update.DirectUpdateHandler2 commit
INFO: start commit(optimize=false,waitFlush=true,waitSearcher=true,expungeDeletes=false)
13-Jan-2011 19:50:47 org.apache.solr.core.SolrDeletionPolicy onCommit
INFO: SolrDeletionPolicy.onCommit: commits:num=2
        commit{dir=/home/alex/java/perso/nutch/solrapp/solr/data/index,segFN=segments_1,version=1294944630023,generation=1,filenames=[segments_1]
        commit{dir=/home/alex/java/perso/nutch/solrapp/solr/data/index,segFN=segments_2,version=1294944630024,generation=2,filenames=[_0.nrm, _0.tis, _0.fnm, _0.tii, _0.frq, segments_2, _0.fdx, _0.prx, _0.fdt]
13-Jan-2011 19:50:47 org.apache.solr.core.SolrDeletionPolicy updateCommits
INFO: newest commit = 1294944630024

You can now do a search via the api:

$ curl "http://localhost:8983/solr/select/?q=video&indent=on"
<?xml version="1.0" encoding="UTF-8"?>
<response>

<lst name="responseHeader">
 <int name="status">0</int>
 <int name="QTime">0</int>
 <lst name="params">
  <str name="indent">on</str>
  <str name="q">video</str>
 </lst>
</lst>
<result name="response" numFound="2" start="0">
 <doc>
  <arr name="anchor"><str>Logout</str></arr>
  <float name="boost">1.03571</float>
  <str name="date">20110212</str>
  <str name="digest">5d62587216b50ed7e52987b09dcb9925</str>
  <str name="id">com.truveo.www:http/</str>
  <str name="lang">unknown</str>
  <arr name="tag"><str/></arr>
  <str name="title">Truveo Video Search</str>
  <long name="tstamp">2011-02-12T18:37:53.031Z</long>
  <arr name="type"><str>text/html</str><str>text</str><str>html</str></arr>
  <str name="url">http://www.truveo.com/</str>
 </doc>
 <doc>
  <arr name="anchor"><str>Comments</str></arr>
  <float name="boost">1.00971</float>
  <str name="date">20110212</str>
  <str name="digest">59edefd6f4711895c2127d45b569d8c9</str>
  <str name="id">org.apache.wiki:http/nutch/</str>
  <str name="lang">en</str>
  <arr name="subcollection"><str>nutch</str></arr>
  <arr name="tag"><str/></arr>
  <str name="title">FrontPage - Nutch Wiki</str>
  <long name="tstamp">2011-02-12T18:37:53.863Z</long>
  <arr name="type"><str>text/html</str><str>text</str><str>html</str></arr>
  <str name="url">http://wiki.apache.org/nutch/</str>
 </doc>
</result>
</response>




Crawl Script


To automate the crawl process, we might want to use a Bash script that runs the suite of Nutch commands, then add it as a cron job. Don't forget to initialize first the crawl db with the inject command. We run several iterations of the the generate/fetch/parse/update cycle with the for loop. We limit the number of urls that will get fetched in one iteration by specifying a -topN argument in the generate command.

#!/bin/bash

# Nutch crawl

export NUTCH_HOME=~/java/workspace/Nutch2.0/runtime/local

# depth in the web exploration
n=1
# number of selected urls for fetching
maxUrls=50000
# solr server
solrUrl=http://localhost:8983
                                                                                                                                                                                                                                                                                                                                                                      
for (( i = 1 ; i <= $n ; i++ ))
do

log=$NUTCH_HOME/logs/log                                                                                                                                                           

# Generate
$NUTCH_HOME/bin/nutch generate -topN $maxUrls > $log

batchId=`sed -n 's|.*batch id: \(.*\)|\1|p' < $log`

# rename log file by appending the batch id
log2=$log$batchId
mv $log $log2
log=$log2

# Fetch
$NUTCH_HOME/bin/nutch fetch $batchId >> $log

# Parse
$NUTCH_HOME/bin/nutch parse $batchId >> $log

# Update
$NUTCH_HOME/bin/nutch updatedb >> $log

# Index
$NUTCH_HOME/bin/nutch solrindex $solrUrl $batchId >> $log

done




Conclusion


I managed to fetch in one run 50k urls with these minor changes. With the default values in conf/nutch-default.xml and MySQL as datastore, these are the logs timestamps when running the initialization and one iteration of generate/fetch/update cycle:
2010-12-13 07:19:26,089 INFO  crawl.InjectorJob - InjectorJob: starting
2010-12-13 07:20:00,077 INFO  crawl.InjectorJob - InjectorJob: finished
2010-12-13 07:20:00,715 INFO  crawl.GeneratorJob - GeneratorJob: starting
2010-12-13 07:20:34,304 INFO  crawl.GeneratorJob - GeneratorJob: done
2010-12-13 07:20:35,041 INFO  fetcher.FetcherJob - FetcherJob: starting
2010-12-13 11:04:00,933 INFO  fetcher.FetcherJob - FetcherJob: done
2010-12-15 01:38:44,262 INFO  crawl.DbUpdaterJob - DbUpdaterJob: starting
2010-12-15 02:15:15,503 INFO  crawl.DbUpdaterJob - DbUpdaterJob: done
 
The next step is comparing with a setup backed by a HBase datastore. I tried once but got a memory error which left my HBase server instance unresponsive. See the description of the problem.
Please don't hesitate to comment and share your own feedback, difficulties and results.

Sunday, November 28, 2010

Java HttpComponents

HttpComponents & Non-blocking IO


Table of Contents
Introduction
Non-blocking HTTP client
HttpComponents architecture
   1. The IO reactors
   2. The HTTP client and request execution handlers
   3. The HTTP connection
Java HTTP client application
  Multi-Threading
  Non blocking I/O
     java.nio simple application
     httpcore-nio application
     Encoding detection
Conclusion


Introduction


At my company, I initiated a project that consists of checking millions of URLs for their status code, 200, 404 ..., in order to flag those that would lead the user to a dead experience. So I was looking for a way to download URLs simultaneously from multiple hosts efficiently.

In this post, we take a deep look at the HttpComponents framework, former Apache Commons HttpClient. Alternatives to HttpComponents exist for both client and server sides in Java: For clients, you can take a look at another Apache project, Mina, or JBoss' Netty. For servers, Jetty or Sun's Glassfish.



Non-blocking HTTP client



Two approaches are possible when implementing an HTTP client/server. You can create a multi-threaded application that runs a thread per host/client, or a single-threaded one that leverages event-driven non-blocking IO. The second method saves the context switch overhead in terms of CPU required in the multi-threaded model when you start handling a request to another host.

That approach creates a reactor, an infinite loop that blocks on a Linux kernel epoll call, in order to realize a readiness selection among all the sockets we are trying to talk to. Then when a socket is ready to be written to or read from, epoll returns it, and the appropriate action is triggered to handle the request according to the I/O event. The design of such a client follows the pattern of a finite state machine? as we need to define the next step to be performed given the current state of the request: for example wait for a readable socket after it was written to in order to get the response to the request that was just sent.

The java.nio package made this approach possible in Java, by introducing the notions of channels and selectors. We now look into the httpcore-nio library from HttpComponents since it provides an API around java.nio that lets you build an asynchronous HTTP client, hence download URLs the fastest way.

HttpComponents architecture


A good starting point is the chapter 2 called "NIO extension" from the tutorial. We will not try to explain the API by commenting a few snippets, but by describing the workflow while looking at the relationships between all the classes involved via diagrams. I hope you will find complementary information in both resources. One interface plays a key role in the application. It will be detailed in this section: org.apache.http.nio.protocol.NHttpRequestExecutionHandler, the HTTP request execution handler. The org.apache.http.nio.protocol.RequestExecutionHandler implementation is described throughout the 3 following points.

The following UML analysis focuses on 3 subgroups of the entire HttpComponents ecosystem:

  1. The IO reactors
  2. The HTTP client and request execution handlers
  3. The HTTP connection

1. The IO reactors



The entry point of the diagram is the DefaultConnectingIOReactor class. By calling its constructor you will be able to create a main reactor that establishes connections in a non-blocking way.


Before doing any HTTP requests, you need to connect to the remote host via the connect method from the ConnectingIOReactor interface, which prototype is:

(org.apache.http.nio.reactor.ConnectingIOReactor)
 SessionRequest connect(SocketAddress remoteAddress,  SocketAddress localAddress, Object attachment,  SessionRequestCallback callback);

This registers the main Selector object to a newly created SocketChannel object, in order to wait for this socket's connectability. As soon as the main reactor selects the associated SelectionKey, we can finalize the connection to its associated SocketChannel.





The designers decided to create worker reactors aside from the main reactor, which will take care of the HTTP request per se. So far we just managed to establish the connection!


Once the worker reactor detects a new ChannelEntry, it registers its worker selector to the associated channel to wait for readibility. The socket will not become readable till we submit the HTTP request.

The third argument of the connect method called "attachment" will be set as a user-defined attribute to the new IOSession object.

2. The HTTP client and request execution handlers


A session just got created after the channel was added to the list of new channels. The IO event dispatch first adds a new NHttpClientConnection attribute to the session object. It then dispatches the event to the HTTP client handler. Once notified by the connected method,

(org.apache.http.nio.NHttpClientHandler)
 void connected(NHttpClientConnection conn, Object attachment);

the NHttpClientHandler calls initializeContext on its NHttpRequestExectionHandler member,

(org.apache.http.nio.protocol.RequestExecutionHandler)
 public void initalizeContext(final HttpContext context, final Object attachment) {
  context.setAttribute("queue", attachment);
 }

in order to let the request execution handler know that we are now connected and set-up the application-specific data.







Do you remember we added an "attachment" to the session when requesting a connection. We just propagated this attachment as an attribute to the HttpContext object owned by the HTTP connection in our implementation of NHttpRequestExecutionHandler. Let's call the attribute "queue". Indeed, it can be for example a queue of jobs that represent the list of urls we want to download from a single host.

That connected method inside the client handler then calls the requestReady method from the same class:

(org.apache.http.nio.NHttpClientHandler)
 void requestReady(NHttpClientConnection conn);

It will initialize the HTTP request by calling the submitRequest method on the request execution handler. Our implementation loads a new attribute in the HttpContext object by adding a new job that is polled from the queue. This will allow us to retrieve the job attribute later once we receive a valid response, so that we update the job with its HTTP status code, for example.

(org.apache.http.nio.protocol.RequestExecutionHandler)
 public HttpRequest submitRequest(final HttpContext context) {
  @SuppressWarnings("unchecked")
  Queue queue = (Queue) context.getAttribute("queue");
  if (queue == null) {
   throw new IllegalStateException("Queue is null");
  }

  Job testjob = queue.poll();
  context.setAttribute("job", testjob);

  if (testjob != null) {
   return generateRequest(testjob);
  } else {
  return null;
  }
 }


"connected" then call the submitRequest method over the NHttpClientConnection object.

3. The HTTP connection



The HTTP connection object performs the request submission. It writes the request in a buffer and turns on the writable mask in the selection key associated to the channel.






To summarize, the sequence of actions performed in order to send the first request is:

BaseIOReactor.sessionCreated / DefaultClientIOEventDispatch.connected / AsyncNHttpClientHandler.connected, requestReady / RequestExecutionHandler.submitRequest /  DefaultNHttpClientConnection.submitRequest / DefaultHttpRequestWriter.write / IOSessionImpl.setEvent(EventMask.WRITE)

and in order to send subsequent requests: 

BaseIOReactor.writable / DefaultClientIOEventDispatch.outputReady / DefaultNHttpClientConnection.produceInput / AsyncNHttpClientHandler.requestReady / RequestExecutionHandler.submitRequest / DefaultNHttpClientConnection.submitRequest / DefaultHttpRequestWriter.write / IOSessionImpl.setEvent(EventMask.WRITE) 

Once the socket becomes readable, here is the cascade of listeners that are triggered by the IO Reactor:

BaseIOReactor.readable / DefaultClientIOEventDispatch.inputReady / DefaultNHttpClientConnection.consumeInput / AsyncNHttpClientHandler.inputReady, processResponse / RequestExecutionHandler.handleResponse


Once the worker reactor detects the socket's readability, the system reads the response, parses it and notifies our request execution handler. At this point we need to retrieve the current job from the HttpContext and we can finally update its result according to the HttpResponse:

(org.apache.http.nio.protocol.RequestExecutionHandler)
 public void handleResponse(final HttpResponse response, final HttpContext context) {
  Job testjob = (Job) context.removeAttribute("job");
  if (testjob == null) {
   throw new IllegalStateException("TestJob is null");
  }

  int statusCode = response.getStatusLine().getStatusCode();
  String content = null;

  HttpEntity entity = response.getEntity();
  if (entity != null) {
   try {
    content = EntityUtils.toString(entity);
   } catch (IOException ex) {
    content = "I/O exception: " + ex.getMessage();
   }
  }
  testjob.setResult(statusCode, content);
 }

That's it. I guess we pretty much went over how the httpcore-nio library handles the lifecycle of the HTTP request. The next section describes an HttpComponents-based client that executes HEAD requests simultaneously.


Java HTTP client application

Before starting anything, checkout HttpComponents httpclient and httpcore trunk versions with SVN. I created an Eclipse project for each directories.


All the application classes are checked-in on Github, within this directory. The purpose of the application is doing HEAD requests to multiple hosts simultaneously. The input is a list of tab separated host/url pairs:

6.cn http://6.cn/w/2j5J9gtTAfDpFPUMbZZz2g
6.cn http://6.cn/w/4QNOFBPKza/zbkQDI7ncRg
academicearth.org http://academicearth.org/lectures/biot-savart-law-gauss-law-for-magnetic-fields
academicearth.org http://academicearth.org/lectures/captial-structure-healthcare
affiliate.kickapps.com http://affiliate.kickapps.com/_Stack-and-Tilt/VIDEO/445590/71460.html
agourahills.patch.com http://agourahills.patch.com//articles/elementary-schools-out-for-sumac-fifth-graders#video-500899
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40154/
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40314/
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40321/
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40326/
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40367/
alkislarlayasiyorum.com http://alkislarlayasiyorum.com/icerik/40443/



A JobQueue object represents the list of urls to be checked within the same host. A Job2 object is mapped to every url. Here is what the Eclipse setup looks like:









Multi-Threading


The parallelization relies on a thread pool that can get as many running thread as connections. See SyncDeadlinkChecker class. For every JobQueue returned by the iterator, we spawn a new thread to fetch its list of urls.



Non blocking I/O


java.nio simple application

Let's take a look at a java.nio based HTTP client. The source is located in SimpleNHttpClient class. It is "simple" because it only manages one connection and it sends the same type of request, without reacting to the information in the response headers, such as the connection state or the cookies. For example this is how we create the request:

private void loadRequest(String path) {
  writeLine("HEAD " + path + " HTTP/1.1");
  writeLine("Connection: Keep-Alive");
  writeLine("Host: " + this.host);
  writeLine("User-Agent: TEST-CLIENT/1.1");
  writeLine("");
 }


I guess this class is a good starting point to debug urls that may or may not work on httpcore-nio. An interesting point is that you need 2 arrays, one that contains the raw format of the data, the other the actual decoded characters. This means you need to pick an appropriate decoder to parse the response.

Let's run an example on these 2 urls:

http://video.tvguide.com/Date+Night+2010/Date+Night/4866938?autoplay=true%20partnerid=OVG
http://video.tvguide.com/Brooks++Dunn/Put+a+Girl+in+It/5445966?autoplay=true%20partnerid=OVG

Execution log:

0 DEBUG [main] SimpleNHttpClient - Connected non blocking: false
16 DEBUG [main] SimpleNHttpClient - Key is connectable
17 DEBUG [main] SimpleNHttpClient - Connected: true
17 DEBUG [main] SimpleNHttpClient - Key is writable
21 DEBUG [main] SimpleNHttpClient - HEAD /Date+Night+2010/Date+Night/4866938?autoplay=true%20partnerid=OVG HTTP/1.1
Connection: Keep-Alive
Host: video.tvguide.com
User-Agent: TEST-CLIENT/1.1

22 DEBUG [main] SimpleNHttpClient - Number of bytes written: 161
218 DEBUG [main] SimpleNHttpClient - Key is readable
219 DEBUG [main] SimpleNHttpClient - Number of bytes read: 291
219 DEBUG [main] SimpleNHttpClient - HTTP/1.1 200 OK
Server: Microsoft-IIS/6.0
P3P: policyref=" /w3c/p3p.xml", CP="CAO PSA OUR BUS"
X-Powered-By: ASP.NET
X-AspNet-Version: 2.0.50727
Content-Type: text/html; charset=utf-8
Cache-Control: private, max-age=2700
Date: Sat, 20 Nov 2010 05:35:56 GMT
Connection: keep-alive


219 DEBUG [main] SimpleNHttpClient - Key is writable
220 DEBUG [main] SimpleNHttpClient - HEAD /Brooks++Dunn/Put+a+Girl+in+It/5445966?autoplay=true%20partnerid=OVG HTTP/1.1
Connection: Keep-Alive
Host: video.tvguide.com
User-Agent: TEST-CLIENT/1.1

220 DEBUG [main] SimpleNHttpClient - Number of bytes written: 164
221 DEBUG [main] SimpleNHttpClient - Key is readable
221 DEBUG [main] SimpleNHttpClient - Number of bytes read: -1
221 DEBUG [main] SimpleNHttpClient - EOF was reached
222 DEBUG [main] SimpleNHttpClient - Adding again /Brooks++Dunn/Put+a+Girl+in+It/5445966?autoplay=true%20partnerid=OVG
233 DEBUG [main] SimpleNHttpClient - Connected non blocking: false
242 DEBUG [main] SimpleNHttpClient - Key is connectable
242 DEBUG [main] SimpleNHttpClient - Connected: true
243 DEBUG [main] SimpleNHttpClient - Key is writable
243 DEBUG [main] SimpleNHttpClient - HEAD /Brooks++Dunn/Put+a+Girl+in+It/5445966?autoplay=true%20partnerid=OVG HTTP/1.1
Connection: Keep-Alive
Host: video.tvguide.com
User-Agent: TEST-CLIENT/1.1

243 DEBUG [main] SimpleNHttpClient - Number of bytes written: 164
366 DEBUG [main] SimpleNHttpClient - Key is readable
367 DEBUG [main] SimpleNHttpClient - Number of bytes read: 291
367 DEBUG [main] SimpleNHttpClient - HTTP/1.1 200 OK
Server: Microsoft-IIS/6.0
P3P: policyref=" /w3c/p3p.xml", CP="CAO PSA OUR BUS"
X-Powered-By: ASP.NET
X-AspNet-Version: 2.0.50727
Content-Type: text/html; charset=utf-8
Cache-Control: private, max-age=2700
Date: Sat, 20 Nov 2010 05:35:56 GMT
Connection: keep-alive

367 DEBUG [main] SimpleNHttpClient - All responses were received

As you can see, we run into a small caveat at time = 221 ms. In this particuliar case, we immediately reach EOF when reading the socket. This means we need to disconnect and reconnect to be able to receive a valid response to the sent request.


httpcore-nio application



It's easy to run into a few pitfalls while writing the application. One needs to respect the event-driven (asynchronous) nature of programming with non-blocking IO. It may turn writing a unit test into a challenge.


Testing an event-driven application


The test case I took inspiration from is located here:

(org.apache.http.nio.protocol.TestAsyncNHttpHandlers)
 public void testHttpHeads() throws Exception;


It reverts the Inversion Of Control by using a wait/notify handshake between threads. A unit test would wait for the job completion by blocking on the wait method:

synchronized(job) {
  try {
   job.wait();
  } catch (InterruptedException ie) {
   LOG.warn(ie);
  }
 }

The request handler would notify the test thread as soon as the response was received:

synchronized(job) {
  job.notify();
 }

This would resume the sleeping test thread which could move on to the next job in a synchronous fashion.



Thread starvation



When I used the very convenient yet treacherous wait/notify exchange per job from above, a thread starvation issue would pop-up regularly when processing several urls on a single host. Basically the greedy I/O dispatcher thread would notify the main thread but hold the lock, and prevents the idle thread from resuming. So the IO reactor would keep waiting for additional urls without seeing any coming in. A second mistake was to reconnect to the host in the main thread. For similar reason, the request connection should be issued within the thread that runs the reactor.


Let's try to move away from threads as much as possible, since the application is expected to be single-threaded yet with the highest performance. This wait/notify exchange per job would block us from processing several hosts in parallel, since it is synchronous by design. Unless we spawn a thread per host, which is what we want to avoid...



Features


  • We implement redirect following pretty easily: We just add a new job to the queue after parsing the Location header.
  • We close the connection when the latest response's Connection header is "closed" or when the job queue is empty.
  • We create a SessionRequestCallback implementation which "completed" method gets called when the connection got established, and a NHttpRequestExecutionHandler one which "handleResponse" method gets called once the response was received.

Encoding detection



The URL below is not supported by the library, because a non standard character (§) is sent in ISO-8859-1 encoding.


We need to first parse the charset value in the Content-Type header to be able to select the appropriate decoder that converts the raw bytes to characters. A quick hack consists of replacing the characters that would break the decoding, as suggested in http://old.nabble.com/Please-make-CharsetDecoder-less-strict-in-SessionInputBufferImpl-td24296440.html. Here is a diff of the hack:

===================================================================
--- httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/SessionInputBufferImpl.java (revision 1037110)
+++ httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/SessionInputBufferImpl.java (working copy)
@@ -36,6 +36,7 @@
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
 
 import org.apache.http.nio.reactor.SessionInputBuffer;
 import org.apache.http.nio.util.ByteBufferAllocator;
@@ -73,6 +74,7 @@
         this.charbuffer = CharBuffer.allocate(linebuffersize);
         this.charset = Charset.forName(HttpProtocolParams.getHttpElementCharset(params));
         this.chardecoder = this.charset.newDecoder();
+        this.chardecoder.onMalformedInput(CodingErrorAction.REPLACE);
     }
 
     public SessionInputBufferImpl(


Conclusion


Let's execute the AsyncDeadlinkChecker class and compare its performance to the SyncDeadlinkChecker one.


The benchmark is a 40k url input spanning 1k hosts. The machine runs a "Genuine Intel(R) CPU T2050 @ 1.60GHz" dual core with 1GB of RAM, under an advertised 10 Mbps Cable connection, running in reality at 300 kB/s. We obtain the following chart:




First, we have to acknowledge that the download rate showing up here is pretty low. This post is not (yet) about writing the highest performance HTTP client. The chart displays a rate's peak at around 200 URLs per second. This is actually normal given the size of my input, which contained only 1000 distinct hosts. I need more URLs... At least we have some data to compare both models.

Second, the startup is pretty slow. Indeed it takes around 10k URLs to reach a "reasonable" rate, higher than 100. This is due to Java internals I am not quite aware of: excessive CPU required to read the input, heap size increase or memory management to fit more data?

Third, HttpClient does not handle well concurrency since the failure rate is way too high at the beginning (and at the end). I might use a wrong multi-threaded implementation, or misunderstand the Thread pool class.

Finally, the whole point of this blog entry was to show that the non blocking I/O implementation performs better than the multi-threaded one. This is reflected in the graph when we remove the noise showing up in the httclient, synchronous curve, ie remove the points of the blue curve that see too many IOExceptions indicated by the yellow curve.