diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 2ff002fd5..9c9398945 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -18,12 +18,6 @@ -behaviour(minirest_api). --compile(nowarn_unused_function). - --import(emqx_mgmt_util, - [ schema/1 - ]). - -import(emqx_gateway_http, [ return_http_error/2 ]). @@ -37,18 +31,160 @@ , gateway_insta_stats/2 ]). --define(EXAMPLE_GATEWAY_LIST, - [ #{ name => <<"lwm2m">> - , status => <<"running">> - , started_at => <<"2021-08-19T11:45:56.006373+08:00">> - , max_connection => 1024000 - , current_connection => 1000 - , listeners => [ - #{name => <<"lw-udp-1">>, status => <<"activing">>}, - #{name => <<"lw-udp-2">>, status => <<"inactived">>} - ] - } - ]). +%%-------------------------------------------------------------------- +%% minirest behaviour callbacks +%%-------------------------------------------------------------------- + +api_spec() -> + {metadata(apis()), []}. + +apis() -> + [ {"/gateway", gateway} + , {"/gateway/:name", gateway_insta} + , {"/gateway/:name/stats", gateway_insta_stats} + ]. +%%-------------------------------------------------------------------- +%% http handlers + +gateway(get, Request) -> + Params = maps:get(query_string, Request, #{}), + Status = case maps:get(<<"status">>, Params, undefined) of + undefined -> all; + S0 -> binary_to_existing_atom(S0, utf8) + end, + {200, emqx_gateway_http:gateways(Status)}. + +gateway_insta(delete, #{bindings := #{name := Name0}}) -> + Name = binary_to_existing_atom(Name0), + case emqx_gateway:unload(Name) of + ok -> + {204}; + {error, not_found} -> + return_http_error(404, <<"Gateway not found">>) + end; +gateway_insta(get, #{bindings := #{name := Name0}}) -> + Name = binary_to_existing_atom(Name0), + case emqx_gateway:lookup(Name) of + #{config := _Config} -> + %% FIXME: Got the parsed config, but we should return rawconfig to + %% frontend + RawConf = emqx_config:fill_defaults( + emqx_config:get_root_raw([<<"gateway">>]) + ), + {200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)}; + undefined -> + return_http_error(404, <<"Gateway not found">>) + end; +gateway_insta(put, #{body := RawConfsIn, + bindings := #{name := Name} + }) -> + %% FIXME: Cluster Consistence ?? + case emqx_gateway:update_rawconf(Name, RawConfsIn) of + ok -> + {200}; + {error, not_found} -> + return_http_error(404, <<"Gateway not found">>); + {error, Reason} -> + return_http_error(500, Reason) + end. + +gateway_insta_stats(get, _Req) -> + return_http_error(401, <<"Implement it later (maybe 5.1)">>). + + +%%-------------------------------------------------------------------- +%% Swagger defines +%%-------------------------------------------------------------------- + +metadata(APIs) -> + metadata(APIs, []). +metadata([], APIAcc) -> + lists:reverse(APIAcc); +metadata([{Path, Fun}|More], APIAcc) -> + Methods = [get, post, put, delete, patch], + Mds = lists:foldl(fun(M, Acc) -> + try + Acc#{M => swagger(Path, M)} + catch + error : function_clause -> + Acc + end + end, #{}, Methods), + metadata(More, [{Path, Mds, Fun} | APIAcc]). + +swagger("/gateway", get) -> + #{ description => <<"Get gateway list">> + , parameters => params_gateway_status_in_qs() + , responses => + #{ <<"200">> => schema_gateway_overview_list() } + }; +swagger("/gateway/:name", get) -> + #{ description => <<"Get the gateway configurations">> + , parameters => params_gateway_name_in_path() + , responses => + #{ <<"404">> => schema_not_found() + , <<"200">> => schema_gateway_conf() + } + }; +swagger("/gateway/:name", delete) -> + #{ description => <<"Delete/Unload the gateway">> + , parameters => params_gateway_name_in_path() + , responses => + #{ <<"404">> => schema_not_found() + , <<"204">> => schema_no_content() + } + }; +swagger("/gateway/:name", put) -> + #{ description => <<"Update the gateway configurations/status">> + , parameters => params_gateway_name_in_path() + , requestBody => schema_gateway_conf() + , responses => + #{ <<"404">> => schema_not_found() + , <<"200">> => schema_no_content() + } + }; +swagger("/gateway/:name/stats", get) -> + #{ description => <<"Get gateway Statistic">> + , parameters => params_gateway_name_in_path() + , responses => + #{ <<"404">> => schema_not_found() + , <<"200">> => schema_gateway_stats() + } + }. + +%%-------------------------------------------------------------------- +%% params defines + +params_gateway_name_in_path() -> + [#{ name => name + , in => path + , schema => #{type => string} + , required => true + }]. + +params_gateway_status_in_qs() -> + [#{ name => status + , in => query + , schema => #{type => string} + , required => false + }]. + +%%-------------------------------------------------------------------- +%% schemas + +schema_not_found() -> + emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>). + +schema_no_content() -> + #{description => <<"No Content">>}. + +schema_gateway_overview_list() -> + emqx_mgmt_util:array_schema( + #{ type => object + , properties => properties_gateway_overview() + }, + <<"Gateway Overview list">> + ). %% XXX: This is whole confs for all type gateways. It is used to fill the %% default configurations and generate the swagger-schema @@ -154,234 +290,42 @@ %% --- END --define(EXAMPLE_GATEWAY_STATS, #{ - max_connection => 10240000, - current_connection => 1000, - messages_in => 100.24, - messages_out => 32.5 - }). +schema_gateway_conf() -> + emqx_mgmt_util:schema( + #{oneOf => + [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS) + ]}). + +schema_gateway_stats() -> + emqx_mgmt_util:schema( + #{ type => object + , properties => + #{ a_key => #{type => string} + }}). %%-------------------------------------------------------------------- -%% minirest behaviour callbacks -%%-------------------------------------------------------------------- +%% properties -api_spec() -> - {apis(), schemas()}. - -apis() -> - [ {"/gateway", metadata(gateway), gateway} - , {"/gateway/:name", metadata(gateway_insta), gateway_insta} - , {"/gateway/:name/stats", metadata(gateway_insta_stats), gateway_insta_stats} - ]. - -metadata(gateway) -> - #{get => #{ - description => <<"Get gateway list">>, - parameters => [ - #{name => status, - in => query, - schema => #{type => string}, - required => false - } +properties_gateway_overview() -> + ListenerProps = + [ {name, string, + <<"Listener Name">>} + , {status, string, + <<"Listener Status">>, [<<"activing">>, <<"inactived">>]} ], - responses => #{ - <<"200">> => #{ - description => <<"OK">>, - content => #{ - 'application/json' => #{ - schema => minirest:ref(<<"gateway_overrview">>), - examples => #{ - simple => #{ - summary => <<"Gateway List Example">>, - value => emqx_json:encode(?EXAMPLE_GATEWAY_LIST) - } - } - } - } - } - } - }}; - -metadata(gateway_insta) -> - UriNameParamDef = #{name => name, - in => path, - schema => #{type => string}, - required => true - }, - NameNotFoundRespDef = - #{description => <<"Not Found">>, - content => #{ - 'application/json' => #{ - schema => minirest:ref(<<"error">>), - examples => #{ - simple => #{ - summary => <<"Not Found">>, - value => #{ - code => <<"NOT_FOUND">>, - message => <<"The gateway not found">> - } - } - } - } - }}, - #{delete => #{ - description => <<"Delete/Unload the gateway">>, - parameters => [UriNameParamDef], - responses => #{ - <<"404">> => NameNotFoundRespDef, - <<"204">> => #{description => <<"No Content">>} - } - }, - get => #{ - description => <<"Get the gateway configurations">>, - parameters => [UriNameParamDef], - responses => #{ - <<"404">> => NameNotFoundRespDef, - <<"200">> => schema(schema_for_gateway_conf()) - } - }, - put => #{ - description => <<"Update the gateway configurations/status">>, - parameters => [UriNameParamDef], - requestBody => schema(schema_for_gateway_conf()), - responses => #{ - <<"404">> => NameNotFoundRespDef, - <<"200">> => #{description => <<"Changed">>} - } - } - }; - -metadata(gateway_insta_stats) -> - #{get => #{ - description => <<"Get gateway Statistic">>, - responses => #{ - <<"200">> => #{ - description => <<"OK">>, - content => #{ - 'application/json' => #{ - schema => minirest:ref(<<"gateway_stats">>), - examples => #{ - simple => #{ - summary => <<"Gateway Statistic">>, - value => emqx_json:encode(?EXAMPLE_GATEWAY_STATS) - } - } - } - } - } - } - }}. - -schemas() -> - [ #{<<"gateway_overrview">> => schema_for_gateway_overrview()} - , #{<<"gateway_stats">> => schema_for_gateway_stats()} - ]. - -schema_for_gateway_overrview() -> - #{type => array, - items => #{ - type => object, - properties => #{ - name => #{ - type => string, - example => <<"lwm2m">> - }, - status => #{ - type => string, - enum => [<<"running">>, <<"stopped">>, <<"unloaded">>], - example => <<"running">> - }, - started_at => #{ - type => string, - example => <<"2021-08-19T11:45:56.006373+08:00">> - }, - max_connection => #{ - type => integer, - example => 1024000 - }, - current_connection => #{ - type => integer, - example => 1000 - }, - listeners => #{ - type => array, - items => #{ - type => object, - properties => #{ - name => #{ - type => string, - example => <<"lw-udp">> - }, - status => #{ - type => string, - enum => [<<"activing">>, <<"inactived">>] - } - } - } - } - } - } - }. - -schema_for_gateway_conf() -> - #{oneOf => - [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS) - , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS) - , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS) - , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS) - , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS) - ]}. - -schema_for_gateway_stats() -> - #{type => object, - properties => #{ - a_key => #{type => string} - }}. - -%%-------------------------------------------------------------------- -%% http handlers - -gateway(get, Request) -> - Params = maps:get(query_string, Request, #{}), - Status = case maps:get(<<"status">>, Params, undefined) of - undefined -> all; - S0 -> binary_to_existing_atom(S0, utf8) - end, - {200, emqx_gateway_http:gateways(Status)}. - -gateway_insta(delete, #{bindings := #{name := Name0}}) -> - Name = binary_to_existing_atom(Name0), - case emqx_gateway:unload(Name) of - ok -> - {200}; - {error, not_found} -> - return_http_error(404, <<"Gateway not found">>) - end; -gateway_insta(get, #{bindings := #{name := Name0}}) -> - Name = binary_to_existing_atom(Name0), - case emqx_gateway:lookup(Name) of - #{config := _Config} -> - %% FIXME: Got the parsed config, but we should return rawconfig to - %% frontend - RawConf = emqx_config:fill_defaults( - emqx_config:get_root_raw([<<"gateway">>]) - ), - {200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)}; - undefined -> - return_http_error(404, <<"Gateway not found">>) - end; -gateway_insta(put, #{body := RawConfsIn, - bindings := #{name := Name} - }) -> - %% FIXME: Cluster Consistence ?? - case emqx_gateway:update_rawconf(Name, RawConfsIn) of - ok -> - {200}; - {error, not_found} -> - return_http_error(404, <<"Gateway not found">>); - {error, Reason} -> - return_http_error(500, Reason) - end. - -gateway_insta_stats(get, _Req) -> - return_http_error(401, <<"Implement it later (maybe 5.1)">>). + emqx_mgmt_util:properties( + [ {name, string, + <<"Gateway Name">>} + , {status, string, + <<"Gateway Status">>, + [<<"running">>, <<"stopped">>, <<"unloaded">>]} + , {started_at, string, + <<>>} + , {max_connection, integer, <<>>} + , {current_connection, integer, <<>>} + , {listeners, {array, object}, ListenerProps} + ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 99876c917..b463fb468 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -481,7 +481,7 @@ queries(Ls) -> end, Ls). %%-------------------------------------------------------------------- -%% Schemas +%% schemas schema_not_found() -> emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>). @@ -518,7 +518,7 @@ schema_subscription() -> ). %%-------------------------------------------------------------------- -%% Object properties def +%% properties defines properties_client() -> emqx_mgmt_util:properties( diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index a57c8e667..1e0c5e2d4 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -548,9 +548,9 @@ check_subscribed_status({SubId, {ParsedTopic, _SubOpts}}, }) -> MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), case lists:keyfind(SubId, 1, Subs) of - {SubId, MountedTopic, _Ack} -> + {SubId, MountedTopic, _Ack, _SubOpts} -> ok; - {SubId, _OtherTopic, _Ack} -> + {SubId, _OtherTopic, _Ack, _SubOpts} -> {error, "Conflict subscribe id"}; false -> ok @@ -795,7 +795,7 @@ handle_deliver(Delivers, Frames0 = lists:foldl(fun({_, _, Message}, Acc) -> Topic0 = emqx_message:topic(Message), case lists:keyfind(Topic0, 2, Subs) of - {Id, Topic, Ack} -> + {Id, Topic, Ack, _SubOpts} -> %% XXX: refactor later metrics_inc('messages.delivered', Channel), NMessage = run_hooks_without_metrics( diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 4c1009610..48bef33ac 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -224,6 +224,11 @@ properties([{Key, Type} | Props], Acc) -> properties([{Key, object, Props1} | Props], Acc) -> properties(Props, maps:put(Key, #{type => object, properties => properties(Props1)}, Acc)); +properties([{Key, {array, object}, Props1} | Props], Acc) -> + properties(Props, maps:put(Key, #{type => array, + items => #{type => object, + properties => properties(Props1) + }}, Acc)); properties([{Key, {array, Type}, Desc} | Props], Acc) -> properties(Props, maps:put(Key, #{type => array, items => #{type => Type},