Multithread background executor using Guava

We have a bunch of audio files that must be downloaded from a remote site and stored locally. After all the downloads have completed, we must persist data about the local files inside a db.

This routine uses a pool of threads (Guava ListeningExecutorService) to perform in parallel the downloads. Each download is expressed as a Future<String> where the string, once returned, will contain data about the local file. Each download is entrusted to a thread from the pool and the corresponding Future<String> is added to a List<Future<String>>.

The actual download is a blocking call to Commons I/O FileUtils.copyURLToFile(). Having enough threads available, all downloads begin practically at the same moment.

Using the Guava static call Futures.successfulAsList() we create right away a new Future that will return a List<String>. The strings in this list are those collected from the abovementioned futures after each download has completed.

We attach right away a Guava FutureCallback to the latter Future<List<String>> and immediately return control to the process that invoked this background execution in the first place.

The FutureCallback will be invoked once the Future completes (that’s to say: when the last download has completed) and will store the resulting strings into the database (operation is called “postprocessAudioSurvey”). The callback must obviously run a void method.

The application is Spring-powered and the threadpool is started and stopped using the Spring API.

We create and initialize the ExecutorService when the whole application starts:


  ListeningExecutorService pool;

  public void initIt() throws Exception {
    try {
      pool = MoreExecutors.listeningDecorator(Executors
          .newFixedThreadPool(numThreads));
      logger.info("successfully started threadpool for audio survey background processing");
    } catch (Exception ex) {
      logger.warn("problems starting audio survey threadpool: "
          + ex.getMessage());
    }
  }

We terminate the executor at the shutdown of the whole application:


  public void cleanUp() throws Exception {
    logger.info("awaiting termination.....");
    pool.awaitTermination(5000, TimeUnit.MILLISECONDS);
    pool.shutdown();
    logger.info("successful shutdown threadpool for audio survey background processing");
  }

This is the code of the main routine:


  public void execute(final Integer idSurvey,final Map urlsByKey) {
    // define file storage dir - create it if necessary

    ...............

    // we extract each url to download from the map urlsByKey
    List<ListenableFuture> audioDownloadFutures = new ArrayList<ListenableFuture>();
    // main cycle for all audio urls
    for (final String audioUrlKey : urlsByKey.keySet()) {

      final String audioUrl = urlsByKey.get(audioUrlKey);
      String ext = FilenameUtils.getExtension(audioUrl);

      // define single audio filename and File
      final String filename = UUID.randomUUID().toString() + "." + ext;
      final File downloadedAudioFile = new File(surveyAudioFileDir,filename);

      // create future of the download by submitting a Callable into the thread pool
      final ListenableFuture audioDownloadFuture = pool.submit(new Callable() {
            @Override
            public String call() throws Exception {
              // download file, store it and return its location
              logger.info("Start copy " + audioUrl + " to "+ downloadedAudioFile);
              // here we need a blocking call!!
              try {
                URL website = new URL(audioUrl);
                FileUtils.copyURLToFile(website,
                    downloadedAudioFile, 15000, 15000);
              } catch (Exception e) {
                logger.warn("PROBLEMS copying!");
              }
              logger.info("--> End copy"");
              // return String to use later in further processing
              return audioUrlKey + "|" + filename;
            }
          });
      // add future to the List<Future>
      audioDownloadFutures.add(audioDownloadFuture);
    }
    // transform the List<Future<String>> into a Future<List<String>>
    ListenableFuture<List<String>> successfulStringOutcomes = Futures
        .successfulAsList(audioDownloadFutures);
    // add callback to be executed when the Future<List<String>> completes
    Futures.addCallback(successfulStringOutcomes,
        new FutureCallback<List<String>>() {
          @Override // success -> postprocess operations
          public void onSuccess(List downloadResults) {
            surveyService.postprocessAudioSurvey(idSurvey,downloadResults);
          }
          @Override // failure -> log error
          public void onFailure(Throwable t) {
            logger.warn("problems collecting results of file downloads");
          }
        });
  } // return immediately control to the thread that invoked this method

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s