Home How to access HttpServerRequest instance on callback?
Reply: 0

How to access HttpServerRequest instance on callback?

moodylearner
1#
moodylearner Published in 2018-02-14 11:18:53Z

So I am injecting HttpServerRequest instance using Context annotation. For "GET" requests we are giving asynchronous responses and while responding we are supposed to log request http header. If any data is there we respond instantly , else we wait for 20 secs timeout and send empty response. For this purpose callback method has been employed in code. Now to log request http header we need access to HttpServerRequest instance inside callback method. I am struggling to do just that.

Code is like:

@Path("/mr")
// We include embedded formats here so you can always use these headers when interacting with
// a consumers resource. The few cases where it isn't safe are overridden per-method
@Produces({Versions.KAFKA_V1_JSON_BINARY_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_AVRO_WEIGHTED_LOW,
           Versions.KAFKA_V1_JSON_JSON_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_WEIGHTED,
           Versions.KAFKA_DEFAULT_JSON_WEIGHTED, Versions.JSON_WEIGHTED})
@Consumes({Versions.KAFKA_V1_JSON_BINARY, Versions.KAFKA_V1_JSON_AVRO, Versions.KAFKA_V1_JSON_JSON,
           Versions.KAFKA_V1_JSON, Versions.KAFKA_DEFAULT_JSON, Versions.JSON,
           Versions.GENERIC_REQUEST})
public class ConsumerResource {

  private static final Logger log = LoggerFactory.getLogger(ConsumersResource.class);

  private final Context ctx;
  ProduceResponse response = new ProduceResponse();


  public ConsumerResource(Context ctx) {
    this.ctx = ctx;
  }

  @javax.ws.rs.core.Context UriInfo uri;
  @javax.ws.rs.core.Context HttpHeaders httpHeaders;
  @javax.ws.rs.core.Context HttpServletRequest httpServletRequest;
  @javax.ws.rs.core.Context HttpServletResponse httpServletResponse;

  @GET
  @Path("/v1/consumer/{group}/topic/{topic}")
  @PerformanceMetric("ctxconsumerold.topic.read-json")
  @Produces({Versions.KAFKA_V1_JSON_JSON_WEIGHTED_LOW}) // Using low weight ensures binary is default
  public void readTopicJson(final @Suspended AsyncResponse asyncResponse,
                            final @PathParam("group") String group,
                            final @HeaderParam("Entity-Id") String instance,
                            final @HeaderParam("x-nssvc-serviceid") String serviceId,
                            final @PathParam("topic") String topic,
                            @QueryParam("max_bytes") @DefaultValue("-1") long maxBytes,
                            @QueryParam("request_timeout") @DefaultValue("-1") int reqtimeout) {
    Date now = new java.util.Date();
    if (! ctx.getSecurityRestrictions().isServiceAllowed(uri, httpHeaders, "Read", "Topic", topic))
      throw Errors.serviceBlockedException(ctx,httpServletResponse,httpServletRequest,now);
    readTopic(asyncResponse, group, instance + serviceId, topic, maxBytes, reqtimeout, JsonConsumerState.class);
    return;
  }

private <KafkaK, KafkaV, ClientK, ClientV> void readTopic(
      final @Suspended AsyncResponse asyncResponse,
      final @PathParam("group") String group,
      final @HeaderParam("Entity-Id") String instance,
      final @PathParam("topic") String topic,
      @QueryParam("max_bytes") @DefaultValue("-1") long maxBytes, int reqTimeout,
      Class<? extends ConsumerState<KafkaK, KafkaV, ClientK, ClientV>> consumerStateType) {
    maxBytes = (maxBytes <= 0) ? Long.MAX_VALUE : maxBytes;
    ctx.getConsumerManager().readTopic(
        group, instance, topic, consumerStateType, maxBytes, reqTimeout,
        new ConsumerManager.ReadCallback<ClientK, ClientV>() {
          @Override
          public void onCompletion(List<? extends ConsumerRecord<ClientK, ClientV>> records, Exception e) {
            if (e != null) {
              asyncResponse.resume(e);
            } else {
              asyncResponse.resume(records);
            }
            transactionId = httpServletRequest.getHeader("X-Cws-TransactionId");// here problem happens
          }
        });
  }

I am hitting illegal state exception at line:

transactionId = httpServletRequest.getHeader("X-Cws-TransactionId");

This is code from Kafka rest proxy , which we are trying to modify. Here is the link of original rest proxy code:

https://github.com/confluentinc/kafka-rest/blob/master/kafka-rest/src/main/java/io/confluent/kafkarest/resources/ConsumersResource.java

Exception I am getting points to scoping issue, but i can't seem to figure out any way. Any comments highly appreciated.

[2018-02-14 16:30:51,973] ERROR java.lang.IllegalStateException: Not inside a request scope. (io.confluent.kafkarest.ConsumerReadTask:213)
[2018-02-14 16:30:51,974] ERROR Not inside a request scope. (io.confluent.kafkarest.ConsumerReadTask:214)
java.lang.IllegalStateException: Not inside a request scope.
        at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
        at org.glassfish.jersey.process.internal.RequestScope.current(RequestScope.java:233)
        at org.glassfish.jersey.process.internal.RequestScope.findOrCreate(RequestScope.java:158)
        at org.jvnet.hk2.internal.MethodInterceptorImpl.invoke(MethodInterceptorImpl.java:74)
        at org.jvnet.hk2.internal.MethodInterceptorInvocationHandler.invoke(MethodInterceptorInvocationHandler.java:62)
        at com.sun.proxy.$Proxy49.getHeader(Unknown Source)
        at io.confluent.kafkarest.resources.ConsumerResource$3.onCompletion(ConsumerResource.java:336)
        at io.confluent.kafkarest.ConsumerManager$1.onCompletion(ConsumerManager.java:280)
        at io.confluent.kafkarest.ConsumerReadTask.finish(ConsumerReadTask.java:209)
        at io.confluent.kafkarest.ConsumerReadTask.finish(ConsumerReadTask.java:186)
        at io.confluent.kafkarest.ConsumerReadTask.doPartialRead(ConsumerReadTask.java:174)
        at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:94)
[2018-02-14 16:30:51,983] ERROR Consumer read callback threw an unhandled exception (io.confluent.kafkarest.ConsumerReadTask:216)
You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.295823 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO