Home How to stop threadpool after main function is over in AWS FirehoseAsync putRecordAsync?
Reply: 0

How to stop threadpool after main function is over in AWS FirehoseAsync putRecordAsync?

Rahul
1#
Rahul Published in 2018-01-12 11:37:55Z

I am trying to use AWS Firehose as a log appender. My requirement is to put logs in the firehose in async mode. I have made the log4j appender following this github repo.

The problem I am facing is that AWS FirehoseAsync opens multiple threads in the background but when the tasks are over, the main process keeps on running even if there is nothing to execute.

public class MainTest {

    private static Logger log = LoggerFactory.getLogger(MainTest.class);


    public static void main(String[] args) throws UnsupportedEncodingException {

        String streamName = "kinesis-firehose stream";
        ClientConfiguration clientConfiguration = new ClientConfiguration();

        clientConfiguration.setMaxErrorRetry(5);
        clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
                PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, 5, true));
        clientConfiguration.setUserAgentPrefix(AppenderConstants.USER_AGENT_STRING);

        BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(1000);

        ThreadPoolExecutorFactory factory = new ThreadPoolExecutorFactory(3, taskBuffer);
        AmazonKinesisFirehoseAsync kinesisFirehoseAsync = AmazonKinesisFirehoseAsyncClientBuilder
                .standard().withCredentials(new DefaultAWSCredentialsProviderChain())
                .withClientConfiguration(clientConfiguration).withExecutorFactory(factory)
                .withRegion(ApplicationContext.getInstance().getAwsRegion()).build();

        for(int i=0; i<10; i++) {
            log.debug("Counter [{}]", i);
            String message = String.valueOf(i);
            ByteBuffer data = ByteBuffer.wrap(message.getBytes("UTF-8"));

            Record record = new Record().withData(data);
            PutRecordRequest request = new PutRecordRequest().withDeliveryStreamName(streamName).withRecord(record);
            kinesisFirehoseAsync.putRecordAsync(request);
        }

    }

}
You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.326386 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO