fix(bridge): improve the schema of connector_http

This commit is contained in:
Shawn 2021-10-21 14:49:53 +08:00
parent dd9e2c4b24
commit d046f9c6e7
5 changed files with 168 additions and 68 deletions

View File

@ -72,7 +72,7 @@
# retain = false
#}
## HTTP bridges to a http server
## HTTP bridges to an HTTP server
bridges.http.my_http_bridge {
## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
url = "http://localhost:9901/messages/${topic}"

View File

@ -67,7 +67,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of
false ->
lists:foreach(fun (Id) ->
send_message(Id, emqx_message:to_map(Message))
send_message(Id, emqx_rule_events:eventmsg_publish(Message))
end, get_matched_bridges(Topic));
true -> ok
end,

View File

@ -91,11 +91,17 @@ parse_http_confs(#{ <<"url">> := Url
, <<"method">> := Method
, <<"body">> := Body
, <<"headers">> := Headers
, <<"request_timeout">> := ReqTimeout
} = Conf) ->
{BaseUrl, Path} = parse_url(Url),
Conf#{ <<"base_url">> => BaseUrl
, <<"preprocessed_request">> =>
emqx_connector_http:preprocess_request(Method, Path, Body, Headers)
, <<"request">> =>
#{ <<"path">> => Path
, <<"method">> => Method
, <<"body">> => Body
, <<"headers">> => Headers
, <<"request_timeout">> => ReqTimeout
}
}.
parse_url(Url) ->

View File

@ -10,8 +10,14 @@
roots() -> [bridges].
fields(bridges) ->
[ {mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}
, {http, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "http_bridge")))}
[ {mqtt,
sc(hoconsc:map(name, ref("mqtt_bridge")),
#{ desc => "MQTT bridges"
})}
, {http,
sc(hoconsc:map(name, ref("http_bridge")),
#{ desc => "HTTP bridges"
})}
];
fields("mqtt_bridge") ->
@ -19,19 +25,64 @@ fields("mqtt_bridge") ->
fields("http_bridge") ->
basic_config_http() ++
[ {url, hoconsc:mk(binary())}
, {from_local_topic, hoconsc:mk(binary())}
, {method, hoconsc:mk(method(), #{default => post})}
, {headers, hoconsc:mk(map(),
[ {url,
sc(binary(),
#{ nullable => false
, desc =>"""
The URL of the HTTP Bridge.<br>
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
or port part.<br>
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
is not allowed.
"""
})}
, {from_local_topic,
sc(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
"""
})}
, {method,
sc(method(),
#{ default => post
, desc =>"""
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
Template with variables is allowed.<br>
"""
})}
, {headers,
sc(map(),
#{ default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>}})
<<"keep-alive">> => <<"timeout=5">>}
, desc =>"""
The headers of the HTTP request.<br>
Template with variables is allowed.
"""
})
}
, {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
, {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})}
, {body,
sc(binary(),
#{ default => <<"${payload}">>
, desc =>"""
The body of the HTTP request.<br>
Template with variables is allowed.
"""
})}
, {request_timeout,
sc(emqx_schema:duration_ms(),
#{ default => <<"30s">>
, desc =>"""
How long will the HTTP request timeout.
"""
})}
].
basic_config_http() ->
@ -39,3 +90,7 @@ basic_config_http() ->
method() ->
hoconsc:enum([post, put, get, delete]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -39,7 +39,6 @@
, validations/0]).
-export([ check_ssl_opts/2
, preprocess_request/4
]).
-type connect_timeout() :: emqx_schema:duration() | infinity.
@ -55,50 +54,77 @@ roots() ->
fields(config).
fields(config) ->
[ {base_url, fun base_url/1}
, {connect_timeout, fun connect_timeout/1}
, {max_retries, fun max_retries/1}
, {retry_interval, fun retry_interval/1}
, {pool_type, fun pool_type/1}
, {pool_size, fun pool_size/1}
, {enable_pipelining, fun enable_pipelining/1}
, {preprocessed_request, hoconsc:mk(map())}
] ++ emqx_connector_schema_lib:ssl_fields().
[ {base_url,
sc(url(),
#{ nullable => false
, validator => fun(#{query := _Query}) ->
{error, "There must be no query in the base_url"};
(_) -> ok
end
, desc => """
The base URL is the URL includes only the scheme, host and port.<br>
When send an HTTP request, the real URL to be used is the concatenation of the base URL and the
path parameter (passed by the emqx_resource:query/2,3 or provided by the request parameter).<br>
For example: http://localhost:9901/
"""
})}
, {connect_timeout,
sc(emqx_schema:duration_ms(),
#{ default => "30s"
, desc => "The timeout when connecting to the HTTP server"
})}
, {max_retries,
sc(non_neg_integer(),
#{ default => 5
, desc => "Max retry times if error on sending request"
})}
, {retry_interval,
sc(emqx_schema:duration(),
#{ default => "1s"
, desc => "Interval before next retry if error on sending request"
})}
, {pool_type,
sc(pool_type(),
#{ default => random
, desc => "The type of the pool. Canbe one of random, hash"
})}
, {pool_size,
sc(non_neg_integer(),
#{ default => 8
, desc => "The pool size"
})}
, {enable_pipelining,
sc(boolean(),
#{ default => true
, desc => "Enable the HTTP pipeline"
})}
, {request, hoconsc:mk(
ref("request"),
#{ default => undefined
, desc => """
If the request is provided, the caller can send HTTP requests via
<code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code>
"""
})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("request") ->
[ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{})}
, {path, hoconsc:mk(binary(), #{})}
, {body, hoconsc:mk(binary(), #{})}
, {headers, hoconsc:mk(map(), #{})}
, {request_timeout,
sc(emqx_schema:duration_ms(),
#{ default => "30s"
, desc => "The timeout when sending request to the HTTP server"
})}
].
validations() ->
[ {check_ssl_opts, fun check_ssl_opts/1} ].
base_url(type) -> url();
base_url(nullable) -> false;
base_url(validator) -> fun(#{query := _Query}) ->
{error, "There must be no query in the base_url"};
(_) -> ok
end;
base_url(_) -> undefined.
connect_timeout(type) -> emqx_schema:duration_ms();
connect_timeout(default) -> <<"5s">>;
connect_timeout(_) -> undefined.
max_retries(type) -> non_neg_integer();
max_retries(default) -> 5;
max_retries(_) -> undefined.
retry_interval(type) -> emqx_schema:duration();
retry_interval(default) -> <<"1s">>;
retry_interval(_) -> undefined.
pool_type(type) -> pool_type();
pool_type(default) -> hash;
pool_type(_) -> undefined.
pool_size(type) -> non_neg_integer();
pool_size(default) -> 8;
pool_size(_) -> undefined.
enable_pipelining(type) -> boolean();
enable_pipelining(default) -> true;
enable_pipelining(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
%% ===================================================================
on_start(InstId, #{base_url := #{scheme := Scheme,
@ -136,7 +162,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
pool_name => PoolName,
host => Host,
port => Port,
base_path => BasePath
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined))
},
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} -> {ok, State};
@ -151,9 +178,9 @@ on_stop(InstId, #{pool_name := PoolName}) ->
ehttpc_sup:stop_pool(PoolName).
on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) ->
case maps:find(preprocessed_request, State) of
error -> ?SLOG(error, #{msg => "preprocessed_request found", bridge_id => BridgeId});
{ok, Request} ->
case maps:get(request, State, undefined) of
undefined -> ?SLOG(error, #{msg => "request not found", bridge_id => BridgeId});
Request ->
#{method := Method, path := Path, body := Body, headers := Headers,
request_timeout := Timeout} = process_request(Request, Msg),
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
@ -194,11 +221,20 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) ->
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
preprocess_request(Method, Path, Body, Headers) ->
preprocess_request(undefined) ->
undefined;
preprocess_request(#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := ReqTimeout
}) ->
#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
, path => emqx_plugin_libs_rule:preproc_tmpl(Path)
, body => emqx_plugin_libs_rule:preproc_tmpl(Body)
, headers => preproc_headers(Headers)
, request_timeout => ReqTimeout
}.
preproc_headers(Headers) ->
@ -211,11 +247,14 @@ process_request(#{
method := MethodTks,
path := PathTks,
body := BodyTks,
headers := HeadersTks} = Conf, Msg) ->
headers := HeadersTks,
request_timeout := ReqTimeout
} = Conf, Msg) ->
Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
, path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
, body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg)
, headers => maps:to_list(proc_headers(HeadersTks, Msg))
, request_timeout => ReqTimeout
}.
proc_headers(HeaderTks, Msg) ->