http-call-response
The http-call-response source receives the responses for the calls made by its corresponding http-call sink, and maps them from formats such as text and JSON.
To handle messages with different HTTP status codes having different formats, multiple http-call-response sources are
allowed to associate with a single http-call sink. It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name> and trp:<header/property> respectively.
Syntax
CREATE SOURCE <NAME> WITH (type="http-call-response", map.type="<STRING>", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>")
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
|---|---|---|---|---|---|
| sink.id | Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages. | STRING | No | No | |
| http.status.code | The matching http responses status code regex, that is used to filter the the messages which will be processed by the source. Eg: http.status.code = '200', http.status.code = '4\\d+' | 200 | STRING | Yes | No |
| allow.streaming.responses | Enable consuming responses on a streaming manner. | false | BOOL | Yes | No |
Example 1
CREATE SINK EmployeeRequestStream WITH (type='http-call', method='POST', publisher.url='http://localhost:8005/registry/employee', sink.id='employee-info', map.type='json') (name string, id int);
CREATE SOURCE EmployeeResponseStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='2\\d+', map.type='json', map.attributes="name='trp:name', id='trp:id', location='$.town', age='$.age'") (name string, id int, location string, age int);
CREATE SOURCE EmployeeErrorStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='4\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="error='A[1]'") (error string);
When events arrive in EmployeeRequestStream, http-call sink makes calls to endpoint on URL http://localhost:8005/registry/employee with
POST method and Content-Type application/json.
If the arriving event has attributes name:John and id:1423 it will send a message with default JSON mapping as follows:
{
"event": {
"name": "John",
"id": 1423
}
}
When the endpoint responds with status code in the range of 200 the message will be received by the http-call-response source associated
with the EmployeeResponseStream stream, because it is correlated with the sink by the same sink.id employee-info and as that expects
messages with http.status.code in regex format 2\\d+. If the response message is in the format:
{
"town": "NY",
"age": 24
}
the source maps the location and age attributes by executing JSON path on the message and maps the name and id attributes by
extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message are received by the http-call-response source associated with the
EmployeeErrorStream stream, because it is correlated with the sink by the same sink.id employee-info and it expects messages with
http.status.code in regex format 4\\d+, and maps the error response to the error attribute of the event.