Home How to access HttpServerRequest instance on callback?
Reply: 0

How to access HttpServerRequest instance on callback?

user46413
1#
user46413 Published in September 20, 2018, 4:36 pm

So I am injecting HttpServerRequest instance using Context annotation. For "GET" requests we are giving asynchronous responses and while responding we are supposed to log request http header. If any data is there we respond instantly , else we wait for 20 secs timeout and send empty response. For this purpose callback method has been employed in code. Now to log request http header we need access to HttpServerRequest instance inside callback method. I am struggling to do just that.

Code is like:

@Path("/mr")
// We include embedded formats here so you can always use these headers when interacting with
// a consumers resource. The few cases where it isn't safe are overridden per-method
@Produces({Versions.KAFKA_V1_JSON_BINARY_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_AVRO_WEIGHTED_LOW,
           Versions.KAFKA_V1_JSON_JSON_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_WEIGHTED,
           Versions.KAFKA_DEFAULT_JSON_WEIGHTED, Versions.JSON_WEIGHTED})
@Consumes({Versions.KAFKA_V1_JSON_BINARY, Versions.KAFKA_V1_JSON_AVRO, Versions.KAFKA_V1_JSON_JSON,
           Versions.KAFKA_V1_JSON, Versions.KAFKA_DEFAULT_JSON, Versions.JSON,
           Versions.GENERIC_REQUEST})
public class ConsumerResource {

  private static final Logger log = LoggerFactory.getLogger(ConsumersResource.class);

  private final Context ctx;
  ProduceResponse response = new ProduceResponse();


  public ConsumerResource(Context ctx) {
    this.ctx = ctx;
  }

  @javax.ws.rs.core.Context UriInfo uri;
  @javax.ws.rs.core.Context HttpHeaders httpHeaders;
  @javax.ws.rs.core.Context HttpServletRequest httpServletRequest;
  @javax.ws.rs.core.Context HttpServletResponse httpServletResponse;

  @GET
  @Path("/v1/consumer/{group}/topic/{topic}")
  @PerformanceMetric("ctxconsumerold.topic.read-json")
  @Produces({Versions.KAFKA_V1_JSON_JSON_WEIGHTED_LOW}) // Using low weight ensures binary is default
  public void readTopicJson(final @Suspended AsyncResponse asyncResponse,
                            final @PathParam("group") String group,
                            final @HeaderParam("Entity-Id") String instance,
                            final @HeaderParam("x-nssvc-serviceid") String serviceId,
                            final @PathParam("topic") String topic,
                            @QueryParam("max_bytes") @DefaultValue("-1") long maxBytes,
                            @QueryParam("request_timeout") @DefaultValue("-1") int reqtimeout) {
    Date now = new java.util.Date();
    if (! ctx.getSecurityRestrictions().isServiceAllowed(uri, httpHeaders, "Read", "Topic", topic))
      throw Errors.serviceBlockedException(ctx,httpServletResponse,httpServletRequest,now);
    readTopic(asyncResponse, group, instance + serviceId, topic, maxBytes, reqtimeout, JsonConsumerState.class);
    return;
  }

private <KafkaK, KafkaV, ClientK, ClientV> void readTopic(
      final @Suspended AsyncResponse asyncResponse,
      final @PathParam("group") String group,
      final @HeaderParam("Entity-Id") String instance,
      final @PathParam("topic") String topic,
      @QueryParam("max_bytes") @DefaultValue("-1") long maxBytes, int reqTimeout,
      Class<? extends ConsumerState<KafkaK, KafkaV, ClientK, ClientV>> consumerStateType) {
    maxBytes = (maxBytes <= 0) ? Long.MAX_VALUE : maxBytes;
    ctx.getConsumerManager().readTopic(
        group, instance, topic, consumerStateType, maxBytes, reqTimeout,
        new ConsumerManager.ReadCallback<ClientK, ClientV>() {
          @Override
          public void onCompletion(List<? extends ConsumerRecord<ClientK, ClientV>> records, Exception e) {
            if (e != null) {
              asyncResponse.resume(e);
            } else {
              asyncResponse.resume(records);
            }
            transactionId = httpServletRequest.getHeader("X-Cws-TransactionId");// here problem happens
          }
        });
  }

I am hitting illegal state exception at line:

transactionId = httpServletRequest.getHeader("X-Cws-TransactionId");

This is code from Kafka rest proxy , which we are trying to modify. Here is the link of original rest proxy code:

https://github.com/confluentinc/kafka-rest/blob/master/kafka-rest/src/main/java/io/confluent/kafkarest/resources/ConsumersResource.java

Exception I am getting points to scoping issue, but i can't seem to figure out any way. Any comments highly appreciated.

[2018-02-14 16:30:51,973] ERROR java.lang.IllegalStateException: Not inside a request scope. (io.confluent.kafkarest.ConsumerReadTask:213)
[2018-02-14 16:30:51,974] ERROR Not inside a request scope. (io.confluent.kafkarest.ConsumerReadTask:214)
java.lang.IllegalStateException: Not inside a request scope.
        at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
        at org.glassfish.jersey.process.internal.RequestScope.current(RequestScope.java:233)
        at org.glassfish.jersey.process.internal.RequestScope.findOrCreate(RequestScope.java:158)
        at org.jvnet.hk2.internal.MethodInterceptorImpl.invoke(MethodInterceptorImpl.java:74)
        at org.jvnet.hk2.internal.MethodInterceptorInvocationHandler.invoke(MethodInterceptorInvocationHandler.java:62)
        at com.sun.proxy.$Proxy49.getHeader(Unknown Source)
        at io.confluent.kafkarest.resources.ConsumerResource$3.onCompletion(ConsumerResource.java:336)
        at io.confluent.kafkarest.ConsumerManager$1.onCompletion(ConsumerManager.java:280)
        at io.confluent.kafkarest.ConsumerReadTask.finish(ConsumerReadTask.java:209)
        at io.confluent.kafkarest.ConsumerReadTask.finish(ConsumerReadTask.java:186)
        at io.confluent.kafkarest.ConsumerReadTask.doPartialRead(ConsumerReadTask.java:174)
        at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:94)
[2018-02-14 16:30:51,983] ERROR Consumer read callback threw an unhandled exception (io.confluent.kafkarest.ConsumerReadTask:216)
share|improve this question

active oldest votes

Your Answer

StackExchange.ifUsing("editor", function () { StackExchange.using("externalEditor", function () { StackExchange.using("snippets", function () { StackExchange.snippets.init(); }); }); }, "code-snippets"); StackExchange.ready(function() { var channelOptions = { tags: "".split(" "), id: "1" }; initTagRenderer("".split(" "), "".split(" "), channelOptions); StackExchange.using("externalEditor", function() { // Have to fire editor after snippets, if snippets enabled if (StackExchange.settings.snippets.snippetsEnabled) { StackExchange.using("snippets", function() { createEditor(); }); } else { createEditor(); } }); function createEditor() { StackExchange.prepareEditor({ heartbeatType: 'answer', convertImagesToLinks: true, noModals: false, showLowRepImageUploadWarning: true, reputationToPostImages: 10, bindNavPrevention: true, postfix: "", onDemand: true, discardSelector: ".discard-answer" ,immediatelyShowMarkdownHelp:true }); } });
 
StackExchange.ready( function () { StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48785910%2fhow-to-access-httpserverrequest-instance-on-callback%23new-answer', 'question_page'); } );

By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Browse other questions tagged rest annotations jax-rs jersey-2.0 or ask your own question.

StackExchange.ready(function(){$.get('/posts/48785910/ivc/af20');});
StackExchange.ready(function () { StackExchange.responsiveness.addSwitcher(); }) (function(i, s, o, g, r, a, m) { i['GoogleAnalyticsObject'] = r; i[r] = i[r] || function() { (i[r].q = i[r].q || []).push(arguments) }, i[r].l = 1 * new Date(); a = s.createElement(o), m = s.getElementsByTagName(o)[0]; a.async = 1; a.src = g; m.parentNode.insertBefore(a, m); })(window, document, 'script', 'https://www.google-analytics.com/analytics.js', 'ga'); StackExchange.ready(function () { StackExchange.ga.init({ sendTitles: true, tracker: window.ga, trackingCodes: [ 'UA-108242619-1' ] }); StackExchange.ga.setDimension('dimension2', '|rest|annotations|jax-rs|jersey-2.0|'); StackExchange.ga.setDimension('dimension3', 'Questions/Show'); StackExchange.ga.trackPageView(); }); /**/ var _qevents = _qevents || [], _comscore = _comscore || []; (function() { var ssl = 'https:' == document.location.protocol, s = document.getElementsByTagName('script')[0], qc = document.createElement('script'); qc.async = true; qc.src = (ssl ? 'https://secure' : 'http://edge') + '.quantserve.com/quant.js'; s.parentNode.insertBefore(qc, s); _qevents.push({ qacct: "p-c1rF4kxgLUzNc" }); /**/ var sc = document.createElement('script'); sc.async = true; sc.src = (ssl ? 'https://sb' : 'http://b') + '.scorecardresearch.com/beacon.js'; s.parentNode.insertBefore(sc, s); _comscore.push({ c1: "2", c2: "17440561" }); })();
You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.308732 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO