diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 632a0a406..e7e5dffbb 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -72,7 +72,7 @@ # retain = false #} -## HTTP bridges to a http server +## HTTP bridges to an HTTP server bridges.http.my_http_bridge { ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string url = "http://localhost:9901/messages/${topic}" diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 8fbd87c64..06f1ba6c9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -67,7 +67,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> lists:foreach(fun (Id) -> - send_message(Id, emqx_message:to_map(Message)) + send_message(Id, emqx_rule_events:eventmsg_publish(Message)) end, get_matched_bridges(Topic)); true -> ok end, diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index 78af4ba41..c13c8629c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -91,11 +91,17 @@ parse_http_confs(#{ <<"url">> := Url , <<"method">> := Method , <<"body">> := Body , <<"headers">> := Headers + , <<"request_timeout">> := ReqTimeout } = Conf) -> {BaseUrl, Path} = parse_url(Url), Conf#{ <<"base_url">> => BaseUrl - , <<"preprocessed_request">> => - emqx_connector_http:preprocess_request(Method, Path, Body, Headers) + , <<"request">> => + #{ <<"path">> => Path + , <<"method">> => Method + , <<"body">> => Body + , <<"headers">> => Headers + , <<"request_timeout">> => ReqTimeout + } }. parse_url(Url) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 86a34699d..acab647c2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -10,8 +10,14 @@ roots() -> [bridges]. fields(bridges) -> - [ {mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))} - , {http, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "http_bridge")))} + [ {mqtt, + sc(hoconsc:map(name, ref("mqtt_bridge")), + #{ desc => "MQTT bridges" + })} + , {http, + sc(hoconsc:map(name, ref("http_bridge")), + #{ desc => "HTTP bridges" + })} ]; fields("mqtt_bridge") -> @@ -19,19 +25,64 @@ fields("mqtt_bridge") -> fields("http_bridge") -> basic_config_http() ++ - [ {url, hoconsc:mk(binary())} - , {from_local_topic, hoconsc:mk(binary())} - , {method, hoconsc:mk(method(), #{default => post})} - , {headers, hoconsc:mk(map(), - #{default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">>}}) + [ {url, + sc(binary(), + #{ nullable => false + , desc =>""" +The URL of the HTTP Bridge.
+Template with variables is allowed in the path, but variables cannot be used in the scheme, host, +or port part.
+For example, http://localhost:9901/${topic} is allowed, but + http://${host}:9901/message or http://localhost:${port}/message +is not allowed. +""" + })} + , {from_local_topic, + sc(binary(), + #{ desc =>""" +The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic +match the from_local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches +from_local_topic will be forwarded. +""" + })} + , {method, + sc(method(), + #{ default => post + , desc =>""" +The method of the HTTP request. All the available methods are: post, put, get, delete.
+Template with variables is allowed.
+""" + })} + , {headers, + sc(map(), + #{ default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">>} + , desc =>""" +The headers of the HTTP request.
+Template with variables is allowed. +""" + }) } - , {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})} - , {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})} + , {body, + sc(binary(), + #{ default => <<"${payload}">> + , desc =>""" +The body of the HTTP request.
+Template with variables is allowed. +""" + })} + , {request_timeout, + sc(emqx_schema:duration_ms(), + #{ default => <<"30s">> + , desc =>""" +How long will the HTTP request timeout. +""" + })} ]. basic_config_http() -> @@ -39,3 +90,7 @@ basic_config_http() -> method() -> hoconsc:enum([post, put, get, delete]). + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 0c5f377ba..388e56919 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -39,7 +39,6 @@ , validations/0]). -export([ check_ssl_opts/2 - , preprocess_request/4 ]). -type connect_timeout() :: emqx_schema:duration() | infinity. @@ -55,50 +54,77 @@ roots() -> fields(config). fields(config) -> - [ {base_url, fun base_url/1} - , {connect_timeout, fun connect_timeout/1} - , {max_retries, fun max_retries/1} - , {retry_interval, fun retry_interval/1} - , {pool_type, fun pool_type/1} - , {pool_size, fun pool_size/1} - , {enable_pipelining, fun enable_pipelining/1} - , {preprocessed_request, hoconsc:mk(map())} - ] ++ emqx_connector_schema_lib:ssl_fields(). + [ {base_url, + sc(url(), + #{ nullable => false + , validator => fun(#{query := _Query}) -> + {error, "There must be no query in the base_url"}; + (_) -> ok + end + , desc => """ +The base URL is the URL includes only the scheme, host and port.
+When send an HTTP request, the real URL to be used is the concatenation of the base URL and the +path parameter (passed by the emqx_resource:query/2,3 or provided by the request parameter).
+For example: http://localhost:9901/ +""" + })} + , {connect_timeout, + sc(emqx_schema:duration_ms(), + #{ default => "30s" + , desc => "The timeout when connecting to the HTTP server" + })} + , {max_retries, + sc(non_neg_integer(), + #{ default => 5 + , desc => "Max retry times if error on sending request" + })} + , {retry_interval, + sc(emqx_schema:duration(), + #{ default => "1s" + , desc => "Interval before next retry if error on sending request" + })} + , {pool_type, + sc(pool_type(), + #{ default => random + , desc => "The type of the pool. Canbe one of random, hash" + })} + , {pool_size, + sc(non_neg_integer(), + #{ default => 8 + , desc => "The pool size" + })} + , {enable_pipelining, + sc(boolean(), + #{ default => true + , desc => "Enable the HTTP pipeline" + })} + , {request, hoconsc:mk( + ref("request"), + #{ default => undefined + , desc => """ +If the request is provided, the caller can send HTTP requests via +emqx_resource:query(ResourceId, {send_message, BridgeId, Message}) +""" + })} + ] ++ emqx_connector_schema_lib:ssl_fields(); + +fields("request") -> + [ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{})} + , {path, hoconsc:mk(binary(), #{})} + , {body, hoconsc:mk(binary(), #{})} + , {headers, hoconsc:mk(map(), #{})} + , {request_timeout, + sc(emqx_schema:duration_ms(), + #{ default => "30s" + , desc => "The timeout when sending request to the HTTP server" + })} + ]. validations() -> [ {check_ssl_opts, fun check_ssl_opts/1} ]. -base_url(type) -> url(); -base_url(nullable) -> false; -base_url(validator) -> fun(#{query := _Query}) -> - {error, "There must be no query in the base_url"}; - (_) -> ok - end; -base_url(_) -> undefined. - -connect_timeout(type) -> emqx_schema:duration_ms(); -connect_timeout(default) -> <<"5s">>; -connect_timeout(_) -> undefined. - -max_retries(type) -> non_neg_integer(); -max_retries(default) -> 5; -max_retries(_) -> undefined. - -retry_interval(type) -> emqx_schema:duration(); -retry_interval(default) -> <<"1s">>; -retry_interval(_) -> undefined. - -pool_type(type) -> pool_type(); -pool_type(default) -> hash; -pool_type(_) -> undefined. - -pool_size(type) -> non_neg_integer(); -pool_size(default) -> 8; -pool_size(_) -> undefined. - -enable_pipelining(type) -> boolean(); -enable_pipelining(default) -> true; -enable_pipelining(_) -> undefined. +sc(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== on_start(InstId, #{base_url := #{scheme := Scheme, @@ -136,7 +162,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme, pool_name => PoolName, host => Host, port => Port, - base_path => BasePath + base_path => BasePath, + request => preprocess_request(maps:get(request, Config, undefined)) }, case ehttpc_sup:start_pool(PoolName, PoolOpts) of {ok, _} -> {ok, State}; @@ -151,9 +178,9 @@ on_stop(InstId, #{pool_name := PoolName}) -> ehttpc_sup:stop_pool(PoolName). on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) -> - case maps:find(preprocessed_request, State) of - error -> ?SLOG(error, #{msg => "preprocessed_request found", bridge_id => BridgeId}); - {ok, Request} -> + case maps:get(request, State, undefined) of + undefined -> ?SLOG(error, #{msg => "request not found", bridge_id => BridgeId}); + Request -> #{method := Method, path := Path, body := Body, headers := Headers, request_timeout := Timeout} = process_request(Request, Msg), on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State) @@ -194,11 +221,20 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -preprocess_request(Method, Path, Body, Headers) -> +preprocess_request(undefined) -> + undefined; +preprocess_request(#{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := ReqTimeout + }) -> #{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method)) , path => emqx_plugin_libs_rule:preproc_tmpl(Path) , body => emqx_plugin_libs_rule:preproc_tmpl(Body) , headers => preproc_headers(Headers) + , request_timeout => ReqTimeout }. preproc_headers(Headers) -> @@ -208,14 +244,17 @@ preproc_headers(Headers) -> end, #{}, Headers). process_request(#{ - method := MethodTks, - path := PathTks, - body := BodyTks, - headers := HeadersTks} = Conf, Msg) -> + method := MethodTks, + path := PathTks, + body := BodyTks, + headers := HeadersTks, + request_timeout := ReqTimeout + } = Conf, Msg) -> Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) , headers => maps:to_list(proc_headers(HeadersTks, Msg)) + , request_timeout => ReqTimeout }. proc_headers(HeaderTks, Msg) ->