Java Lambda-Enabled Concurrency - Don't Use Mutable Variables


What's wrong in following code?
The following code tries to submit all query task to threadpool, but it fails to do that - It only reads part of data.
private List<Future<List<T>>> findAllAsyncImpl(final SolrParams params, final int readSize,
        final int totalCount) {
    final ModifiableSolrParams query = new ModifiableSolrParams(params);
    query.set(CommonParams.ROWS, readSize);
    final List<Future<List<T>>> futures = new ArrayList<>(totalCount);
    int start = 0;
    while (start < totalCount) {
        query.set(CommonParams.START, start);
        futures.add(executor.submit(() -> querySolr(new ModifiableSolrParams(query))));
        start += readSize;
    }
    return futures;
}

I checked the log - what I expected is that it runs query like start=0, 1000, 2000, until totalCount-1000 (readSize is 1000)-  but it's not, it runs query with some start value (here start=45000) multiple times, some start value 0 time.
AbstractSolrRepository querySolr: q=omitted...&rows=1000&start=45000
AbstractSolrRepository querySolr: q=omitted...&rows=1000&start=45000
AbstractSolrRepository querySolr: q=omitted...&rows=1000&start=45000
AbstractSolrRepository querySolr: q=omitted...&rows=1000&start=45000

What the code is doing
Now let's change the lambda expression to the anonymous class() - Eclipse quick fix(Command +1) then select "convert to anonymous class creation".
The second part code creates explicit callable.


while (start < totalCount) {
    query.set(CommonParams.START, start);
    futures.add(executor.submit(new Callable<List<T>>() {
        @Override
        public List<T> call() throws Exception {
            return querySolr(new ModifiableSolrParams(query));
        }
    }));
    start += readSize;
}


while (start < totalCount) {
    query.set(CommonParams.START, start);
    futures.add(executor.submit(new SolrQueryTask(query)));
    start += readSize;
}

private class SolrQueryTask implements Callable<List<T>> {
    private final ModifiableSolrParams query;
    public SolrQueryTask(final ModifiableSolrParams query) {
        this.query = query;
    }
    public List<T> call() throws Exception {
        return querySolr(new ModifiableSolrParams(query));
    }
}
Now the root cause is kind of clear: the task is running in a different thread(for example at that time start=0), it refers variable whose state is changed later,  when the task is actually running, the query variable is already changed.

The solution
private List<Future<List<T>>> findAllAsync
    (final SolrParams params, final int readSize, final int totalCount) {
    final List<Future<List<T>>> futures = new ArrayList<>(totalCount);
    int start = 0;
    while (start < totalCount) {
        final ModifiableSolrParams targetParams =
        new ModifiableSolrParams(params).set(CommonParams.ROWS, readSize).set(CommonParams.START, start);

        futures.add(executor.submit(() -> querySolr(targetParams)));
        start += readSize;
    }
    return futures;
}

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)