Nutch2: Extend Nutch2 to Crawl via Http API


In Nutch 2.x, we can run "nutch server 8080" to run the embedded jetty server. Nutch provides basic rest http interface -(Using Restlet), such as /nutch/admin/status|stop.

We can easily extend Nutch to add a new http interface: /crawler. User can call /crawler with required parameters to start crawl a website to Solr.

How to Implement
Our /crawler api will accept some parameters wrapped in CrawlerConfigureEntity class. Via CrawlerConfigureEntity, client can tell /crawler the solrURL, crawlDepth, seedurls, included urls, excluded urls, included file types, excluded file types, crawlID, taskName etc.

Also we can add some pre-defined tasks, we include files of seed url, subcollections.xml, nutch-sit.xml and other files. SO client only need specify task name, solr url, and crawl depth: easier to use.


In CrawlerResource, we will create a folder in tasks/${taskName}, create or copy seed url file, all needed files in conf folder, copy bin folder to tasks/${taskName}. Then we will start crawl script.


Save Index into Temporary Solr Core
We will crawl automatically when we make change to our documentation site, and when we are crawling, we don't want to interrupt current Solr Server, So we will save index into tmp core, after crawl is finished, we will swap with the core that are serving user request.
Also we can save (up to X) index history, so we can revert to previous index if the crawl failed for some reason.
Implementation Code
Register CrawlerResource in NutchApp:
org.apache.nutch.api.NutchApp.createInboundRoot()
public synchronized Restlet createInboundRoot() {
  // ...
  router.attach("/"+ CrawlerResource.PATH, CrawlerResource.class);
  return router;
}
2. CrawlerResource Handles  /crawler request
package org.apache.nutch.api;
public class CrawlerResource extends ServerResource {
  private static final String FILE_SUFFIX_URLFILTER_TXT = "suffix-urlfilter.txt";
  private static final String FILE_REGEX_URLFILTER_TXT = "regex-urlfilter.txt";
  private static final String DIR_SEED = "urls";
  private static final String DIR_BIN = "bin";
  public static final String PATH = "crawler";

  @Get @Post
  public Object crawl(final CrawlerConfigureEntity config) throws Exception {
    validateParameter(config);
    LOGGER.info("Accept one request: " + config.toString());
    final File baseLocation = getBaseLocation();
    final File thisTaskBaseDir = createTask(config, baseLocation);

    Map<String, Object> result = new HashMap<String, Object>();
    result.put("start", new Date().toString());

    Thread thread = new Thread(new  Runnable() {
      @Override
      public void run() {
        try {
          doCrawl(config, baseLocation, thisTaskBaseDir);
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    });
    thread.start();
    result.put("msg", "Crawl in background.");
    result.put("end", new Date().toString());
    return result;
  }

  private void doCrawl(CrawlerConfigureEntity config, File baseLocation,
      File thisTaskBaseDir) throws MalformedURLException, SolrServerException,
      IOException, InterruptedException {
    long start = new Date().getTime();
    File thisTaskLogsDir = new File(thisTaskBaseDir, "logs");
    thisTaskLogsDir.mkdirs();

    cleanDataIfNeeded(config);
    ProcessBuilder processBuilder = new ProcessBuilder();
    processBuilder.directory(thisTaskBaseDir);
    String crawlScriptPath = thisTaskBaseDir.getAbsolutePath() + File.separator
        + DIR_BIN + File.separator + "crawl";
    String seedDir = thisTaskBaseDir.getAbsolutePath() + File.separator
        + DIR_SEED;

    StringBuilder sb = new StringBuilder();

    sb.append("export MY_NUTCH_HOME='").append(baseLocation).append("';")
        .append("export MY_NUTCH_CONF_DIR='")
        .append(thisTaskBaseDir + File.separator + "conf").append("'; ");
    String solrIndexParmas = "";
    if (!StringUtils.isEmpty(config.getSolrinexParams())) {
      solrIndexParmas += " --SOLRINEX_PARAMS=\"" + config.getSolrinexParams()
          + "\"";
    }
    String crawlCmd = crawlScriptPath + " "+ seedDir+ " "
        + config.getCrawlID() + " "+ (config.getTmpCoreName() == null ? config.getSolrURL()
        : getTmpSolrServerURL(config)) + " " + String.valueOf(config.getCrawlDepth()) + solrIndexParmas + " >> "
        + thisTaskLogsDir.getAbsolutePath() + File.separator + "log" + " 2>&1 ";
    sb.append(crawlCmd);
    processBuilder.command("/bin/bash", "--login", "-c", sb.toString());
    Map<String, String> env = processBuilder.environment();
    env.put("MY_NUTCH_HOME", baseLocation.getAbsolutePath());
    env.put("MY_NUTCH_CONF_DIR", thisTaskBaseDir + File.separator + "conf");
    updateSolr(config);

    Process process = processBuilder.start();
    int exitValue = process.waitFor();
    LOGGER.info("Crawl took " + (new Date().getTime() - start) / 1000
        + " seconds, exitCode: " + exitValue);
  }

  public static void updateSolr(CrawlerConfigureEntity config)
      throws MalformedURLException, SolrServerException, IOException {
    String tmpSolrServer = getTmpSolrServerURL(config);
    LOGGER.info("Start to swap " + tmpSolrServer + " back to "
        + config.getSolrURL());
    // tmp Solr server: host:port/solr/coer-tmp
    int idx = tmpSolrServer.lastIndexOf("/");
    // solrBaseUrl: host:port/solr
    String solrBaseUrl = tmpSolrServer.substring(0, idx);

    CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(solrBaseUrl);

    String oldSolrServer = config.getSolrURL();
    String remainStr = oldSolrServer.substring(idx);
    String oldCoreName = "";
    if ("".equals(remainStr) || "/".equals(remainStr)) {
      oldCoreName = "collection1";
    } else {
      if (remainStr.charAt(0) == '/') {
        oldCoreName = remainStr.substring(1);
      } else {
        oldCoreName = remainStr;
      }
    }
    String tmpCore = config.getTmpCoreName();
    swapCore(solrServer, oldCoreName, tmpCore);
 //TODO Save old index to core-archive-date
  }

  private static void swapCore(CommonsHttpSolrServer solrServer,
      String corename1, String corename2) throws SolrServerException,
      IOException {
    CoreAdminRequest adminReq = new CoreAdminRequest();
    adminReq.setAction(CoreAdminAction.SWAP);
    adminReq.setCoreName(corename1);
    adminReq.setOtherCoreName(corename2);
    solrServer.request(adminReq);
  }

  private File createTask(CrawlerConfigureEntity config, File baseLocation)
      throws IOException, Exception {
    File tasksBaseDir = new File(baseLocation, "tasks");
    if (!tasksBaseDir.exists()) {
      tasksBaseDir.mkdirs();
    }

    boolean isPredined = config.getPreDefinedTask() != null;
    File thisTaskBaseDir = new File(tasksBaseDir, config.getTaskName());
    if (!isPredined && thisTaskBaseDir.exists()) {
      // for develop user only
      if (config.isDeleteIfExist()) {
        FileUtils.deleteDirectory(thisTaskBaseDir);
      } else {
        throw new Exception("Folder " + thisTaskBaseDir + " already exists.");
      }
    } else {
      thisTaskBaseDir.mkdirs();
    }
    createTaskSeed(config, baseLocation, thisTaskBaseDir);
    createTaskConfs(config, baseLocation, thisTaskBaseDir);
    copyBinFolder(config, baseLocation, thisTaskBaseDir);
    if (!isPredined && !config.getSubCollections().isEmpty()) {
      updateSubCollections(new File(thisTaskBaseDir, "conf"),
          config.getSubCollections());
    }
    return thisTaskBaseDir;
  }
  private void cleanDataIfNeeded(CrawlerConfigureEntity config)
      throws MalformedURLException, SolrServerException, IOException {
    if (config.isUpdateDirectly()) {
      if (config.isCleanData()) {
        String solrServerUrl = config.getSolrURL();
        cleanData(solrServerUrl);
      }
    } else {
      String tmpSolrServerStr = getTmpSolrServerURL(config);
      cleanData(tmpSolrServerStr);
    }
  private static String getTmpSolrServerURL(CrawlerConfigureEntity config) {
    String oldSolrServerUrl = config.getSolrURL();

    if (oldSolrServerUrl.endsWith("/")) {
      oldSolrServerUrl = oldSolrServerUrl.substring(0,
          oldSolrServerUrl.length() - 1);
    }
    int idx = oldSolrServerUrl.lastIndexOf("/");
    String str = oldSolrServerUrl.substring(idx + 1);

    String tmpSolrServerStr;
    if (str.equals("solr")) {
      tmpSolrServerStr = oldSolrServerUrl + "/" + config.getTmpCoreName();
    } else {
      tmpSolrServerStr = oldSolrServerUrl.substring(0, idx) + "/"
          + config.getTmpCoreName();

    }
    return tmpSolrServerStr;
  }
  private void cleanData(String solrServerUlr) throws MalformedURLException,
      SolrServerException, IOException {
    CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(solrServerUlr);
    solrServer.deleteByQuery("*:*");
    solrServer.commit();
  }
  // core-) if it doesn't exists. 
  private void creatCore(String solrServerUlr, String newCoreName)
      throws MalformedURLException, SolrServerException {
    CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(solrServerUlr);
    SolrQuery query = new SolrQuery("*:*").setRows(0);
    solrServer.query(query);
  }

  private void validateParameter(CrawlerConfigureEntity config)
      throws Exception {
    if (StringUtils.isEmpty(config.getSolrURL())) {
      throw new Exception("Must set solrURL");
    }
    if (StringUtils.isEmpty(config.getCrawlID())) {
      config.setCrawlID(UUID.randomUUID().toString());
    }
    if (config.getCrawlDepth() == 0) {
      config.setCrawlDepth(5);
    }
    if (config.isUpdateDirectly()
        && StringUtils.isBlank(config.getTmpCoreName())) {
      String tmpCoreName = "core-" + RandomStringUtils.random(3);
      config.setTmpCoreName(tmpCoreName);
    }
    boolean isPredined = config.getPreDefinedTask() != null;
    if (isPredined) {
      config.setTaskName(config.getPreDefinedTask());
      if (StringUtils.isBlank(config.getSolrinexParams())) {
        config.setSolrinexParams("update.chain=webCrawlerChain");
      }
    } else {
      if (config.getUrls() == null || config.getUrls().isEmpty()) {
        throw new Exception("Must set urls to crawl.");
      }
      if (StringUtils.isEmpty(config.getTaskName())) {
        config.setTaskName(config.getCrawlID());
      }
    }
  }
  private File copyBinFolder(CrawlerConfigureEntity config, File baseLocation,
      File thisTaskBaseDir) throws IOException {
    File destBinDir = new File(thisTaskBaseDir, DIR_BIN);
    if (config.getPreDefinedTask() != null) {
      if (destBinDir.exists()) {
        return destBinDir;
      }
    }
    destBinDir.mkdirs();
    File srcBinDir = new File(baseLocation, DIR_BIN);
    FileUtils.copyDirectory(srcBinDir, destBinDir);
    // make all files in destBinDir
    File[] files = destBinDir.listFiles();
    if (files != null) {
      for (File file : files) {
        file.setExecutable(true);
      }
    }
    return destBinDir;
  }

  private void overwriteWithPredefinedFile(File oldFile,
      CrawlerConfigureEntity config) throws IOException {
    String fileName = oldFile.getName();
    int indx = fileName.indexOf(".");
    String preDefinedFN = fileName.substring(0, indx) + "-"
        + config.getPreDefinedTask() + fileName.substring(indx);
    File preDefinedFile = new File(oldFile.getParentFile(), preDefinedFN);
    if (preDefinedFile.exists()) {
      FileUtils.deleteQuietly(oldFile);
      FileUtils.copyFile(preDefinedFile, oldFile);
    }
  }
  private void createTaskConfs(CrawlerConfigureEntity config,
      File baseLocation, File thisTaskBaseDir) throws IOException {
    File srcConfDir = new File(baseLocation, "conf");
    File thisTaskConfDir = new File(thisTaskBaseDir, "conf");
    String preDeinedTask = config.getPreDefinedTask();
    if (preDeinedTask != null) {
      if (thisTaskConfDir.exists()) {
        return;
      } else {
        thisTaskConfDir.mkdir();
        FileUtils.copyDirectory(srcConfDir, thisTaskConfDir);
        String[] fileStrs = { "nutch-site.xml", "subcollections.xml",
            FILE_REGEX_URLFILTER_TXT, FILE_SUFFIX_URLFILTER_TXT };
        for (String str : fileStrs) {
          overwriteWithPredefinedFile(new File(thisTaskConfDir, str), config);
        }
      }
    } else {
      thisTaskConfDir.mkdir();
      FileUtils.copyDirectory(srcConfDir, thisTaskConfDir);

      // handle include and exclue paths
      List<String> paths = config.getIncludePaths();
      File regexUrlfilterFile = new File(thisTaskConfDir,
          FILE_REGEX_URLFILTER_TXT);
      if (!paths.isEmpty()) {
        appendLines(regexUrlfilterFile, paths);
      }

      paths = config.getExcludePaths();
      if (!paths.isEmpty()) {
        appendLines(regexUrlfilterFile, paths);
      }
      // handle types
      File suffixUrlfilterFile = new File(thisTaskConfDir,
          FILE_SUFFIX_URLFILTER_TXT);
      List<String> suffixFilters = FileUtils.readLines(suffixUrlfilterFile);
      List<String> excludeTypes = config.getExcludeFileTypes();

      for (String excludeType : excludeTypes) {
        if (!suffixFilters.contains(excludeType)) {
          suffixFilters.add(excludeType);
        }
      }
      List<String> includeTypes = config.getIncludeFileTypes();
      for (String includeType : includeTypes) {
        if (suffixFilters.contains(includeType)) {
          suffixFilters.remove(includeType);
        }
      }
      FileUtils.writeLines(suffixUrlfilterFile, suffixFilters);
    }
  }
  public static void appendLines(File file, String encoding,
      Collection<String> lines, String lineEnding) throws IOException {
    OutputStream out = null;
    try {
      out = new FileOutputStream(file, true);
      IOUtils.writeLines(lines, lineEnding, out, encoding);
    } finally {
      IOUtils.closeQuietly(out);
    }
  }
  public static void appendLines(File file, Collection<String> lines)
      throws IOException {
    appendLines(file, null, lines, null);
  }
  private File createTaskSeed(CrawlerConfigureEntity config, File baseLocation,
      File thisTaskBaseDir) throws IOException {
    // create a file urls/seed.txt, and copy content of urls to it.
    File urlsDir = new File(thisTaskBaseDir, DIR_SEED);
    urlsDir.mkdirs();
    File seedFile = new File(urlsDir, "seeds.txt");
    if (config.getPreDefinedTask() == null) {
      seedFile.delete();
      seedFile.createNewFile();
      FileUtils.writeLines(seedFile, config.getUrls());
    } else {
      if (!seedFile.exists()) {
        FileUtils.copyFile(new File(baseLocation, DIR_SEED + "/" + "seeds-"
            + config.getPreDefinedTask() + ".txt"), seedFile);
      }
    }

    return seedFile;
  }
  //Save collections into file
  public void updateSubCollections(File thisTaskConfDir,
      List<SubCollectionEntity> subcollections) throws IOException {
    StringBuilder sb = new StringBuilder();
    sb.append("<subcollections>");
    final Iterator<SubCollectionEntity> iterator = subcollections.iterator();
    while (iterator.hasNext()) {
      final SubCollectionEntity subCol = iterator.next();
      sb.append("<subcollection>").append("<name>").append(subCol.getName())
          .append("</name>").append("<id>").append(subCol.getName())
          .append("</id>").append("<whitelist>")
          .append(subCol.whiteListString()).append("</whitelist>")
          .append("<blacklist>").append(subCol.blackListString())
          .append("</blacklist>").append("</subcollection>");
    }
    sb.append("</subcollections>");
    FileUtils.writeStringToFile(
        new File(thisTaskConfDir, "subcollections.xml"), sb.toString());
  }
  private File getBaseLocation() throws UnsupportedEncodingException {
    File jarPath = new File(this.getClass().getProtectionDomain()
        .getCodeSource().getLocation().getPath());
    String baseLocation = jarPath.getParentFile().getParent();
    baseLocation = URLDecoder.decode(baseLocation,
        System.getProperty("file.encoding"));
    return new File(baseLocation);
  }
}
3. CrawlerConfigureEntity
public class CrawlerConfigureEntity {
  private String preDefinedTask;
  private boolean updateDirectly, cleanData;
  private String tmpCoreName, taskName;
  private List<String> urls;
  private String solrURL, crawlID;
  private int crawlDepth;
  private List<String> includePaths, excludePaths, includeFileTypes, excludeFileTypes;
  private List<SubCollectionEntity> subCollections = new ArrayList<SubCollectionEntity>();
  private String solrinexParams;
  private boolean sync = false, deleteIfExist = false;
}  

public class SubCollectionEntity {
 private String name, id;
 private List<String> blackList, whiteList;
}

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)