Home Multithreading help using ExecutorService in Java
Reply: 1

Multithreading help using ExecutorService in Java

Anit
1#
Anit Published in 2018-02-14 05:44:50Z

This question already has an answer here:

  • ExecutorService Future::get very slow 1 answer

I am trying to search a list of words and find the total count of all the words across multiple files.

My logic is to have separate threads for each file and get the count. Finally I can aggregate the total count got from each of the threads.

Say, I have 50 files each of 1MB. The performance does not improve when I am using multiple threads. My total execution time does not improve with FILE_THREAD_COUNT. I am getting almost the same execution time when my thread count is either 1 or 50.

Am I doing something wrong in using the executor service?

Here is my code.

public void searchText(List<File> filesInPath, Set<String> searchWords) {
    try {
        BlockingQueue<File> filesBlockingQueue = new ArrayBlockingQueue<>(filesInPath.size());
        filesBlockingQueue.addAll(filesInPath);

        ExecutorService executorService = Executors.newFixedThreadPool(FILE_THREAD_COUNT);
        int totalWordCount = 0;
        while (!filesBlockingQueue.isEmpty()) {
            Callable<Integer> task = () -> {
                int wordCount = 0;
                try {
                    File file = filesBlockingQueue.take();
                    try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) {
                        String currentLine;
                        while ((currentLine = bufferedReader.readLine()) != null) {
                            String[] words = currentLine.split("\\s+");
                            for (String word : words) {
                                for (String searchWord : searchWords) {
                                    if (word.contains(searchWord)) {
                                        wordCount++;
                                    }
                                }
                            }
                        }
                    } catch (Exception e) {
                        // Handle error
                    }
                } catch (Exception e) {
                    // Handle error
                }
                return wordCount;
            };
            totalWordCount += executorService.submit(task).get();
        }
        System.out.println("Final word count=" + totalWordCount);
        executorService.shutdown();
    } catch (Exception e) {
        // Handle error
    }
}
Bohemian
2#
Bohemian Reply to 2018-02-14 05:59:07Z

Yes, you're doing something wrong.

The problem is here:

executorService.submit(task).get()

Your code submits a task then waits for it to finish, which achieves nothing in parallel; the tasks run sequentially. And your BlockingQueue adds no value whatsoever.

The way to run tasks in parallel is to first submit all tasks, collect the Futures returned, then call get() on all of them. Like this:

List<Future<Integer>> futures = filesInPath.stream()
    .map(<create your Callable>)
    .map(executorService::submit)
    .collect(toList());

for (Future future : futures)
    totalWordCount += future.get();
}

You can actually do it in one stream, by going through the intermediate list (as above) but then immediately streaming that, but you have to wrap the call to Future#get in some code to catch the checked exception - I leave that as an exercise for the reader.

You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.320945 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO