Merge remote-tracking branch 'origin/release-v43' into release-v44
This commit is contained in:
commit
f0b350ba4f
|
@ -45,7 +45,7 @@ jobs:
|
||||||
with:
|
with:
|
||||||
# dialyzer PLTs
|
# dialyzer PLTs
|
||||||
path: ~/.cache/rebar3/
|
path: ~/.cache/rebar3/
|
||||||
key: dialyer-${{ matrix.otp }}
|
key: dialyzer-${{ matrix.otp }}
|
||||||
- name: make xref
|
- name: make xref
|
||||||
run: make xref
|
run: make xref
|
||||||
- name: make dialyzer
|
- name: make dialyzer
|
||||||
|
|
|
@ -122,8 +122,8 @@ cli(_) ->
|
||||||
, {"acl list ", "List all acls"}
|
, {"acl list ", "List all acls"}
|
||||||
, {"acl show clientid <Clientid>", "Lookup clientid acl detail"}
|
, {"acl show clientid <Clientid>", "Lookup clientid acl detail"}
|
||||||
, {"acl show username <Username>", "Lookup username acl detail"}
|
, {"acl show username <Username>", "Lookup username acl detail"}
|
||||||
, {"acl aad clientid <Clientid> <Topic> <Action> <Access>", "Add clientid acl"}
|
, {"acl add clientid <Clientid> <Topic> <Action> <Access>", "Add clientid acl"}
|
||||||
, {"acl add Username <Username> <Topic> <Action> <Access>", "Add username acl"}
|
, {"acl add username <Username> <Topic> <Action> <Access>", "Add username acl"}
|
||||||
, {"acl add _all <Topic> <Action> <Access>", "Add $all acl"}
|
, {"acl add _all <Topic> <Action> <Access>", "Add $all acl"}
|
||||||
, {"acl delete clientid <Clientid> <Topic>", "Delete clientid acl"}
|
, {"acl delete clientid <Clientid> <Topic>", "Delete clientid acl"}
|
||||||
, {"acl delete username <Username> <Topic>", "Delete username acl"}
|
, {"acl delete username <Username> <Topic>", "Delete username acl"}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_auth_mnesia,
|
{application, emqx_auth_mnesia,
|
||||||
[{description, "EMQ X Authentication with Mnesia"},
|
[{description, "EMQ X Authentication with Mnesia"},
|
||||||
{vsn, "4.3.9"}, % strict semver, bump manually
|
{vsn, "4.3.10"}, % strict semver, bump manually
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,mnesia]},
|
{applications, [kernel,stdlib,mnesia]},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_coap,
|
{application, emqx_coap,
|
||||||
[{description, "EMQ X CoAP Gateway"},
|
[{description, "EMQ X CoAP Gateway"},
|
||||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,gen_coap]},
|
{applications, [kernel,stdlib,gen_coap]},
|
||||||
|
|
|
@ -1,9 +1,15 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.0",[
|
[{<<"4\\.3\\.[0-1]">>,[
|
||||||
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
|
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{<<".*">>, []}],
|
{<<".*">>, []}],
|
||||||
[{"4.3.0",[
|
[{<<"4\\.3\\.[0-1]">>,[
|
||||||
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
|
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{<<".*">>, []}]
|
{<<".*">>, []}]
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -31,6 +31,9 @@
|
||||||
-export([ subscribe/2
|
-export([ subscribe/2
|
||||||
, unsubscribe/2
|
, unsubscribe/2
|
||||||
, publish/3
|
, publish/3
|
||||||
|
, received_puback/2
|
||||||
|
, message_payload/1
|
||||||
|
, message_topic/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ client_pid/4
|
-export([ client_pid/4
|
||||||
|
@ -95,6 +98,15 @@ unsubscribe(Pid, Topic) ->
|
||||||
publish(Pid, Topic, Payload) ->
|
publish(Pid, Topic, Payload) ->
|
||||||
gen_server:call(Pid, {publish, Topic, Payload}).
|
gen_server:call(Pid, {publish, Topic, Payload}).
|
||||||
|
|
||||||
|
received_puback(Pid, Msg) ->
|
||||||
|
gen_server:cast(Pid, {received_puback, Msg}).
|
||||||
|
|
||||||
|
message_payload(#message{payload = Payload}) ->
|
||||||
|
Payload.
|
||||||
|
|
||||||
|
message_topic(#message{topic = Topic}) ->
|
||||||
|
Topic.
|
||||||
|
|
||||||
%% For emqx_management plugin
|
%% For emqx_management plugin
|
||||||
call(Pid, Msg) ->
|
call(Pid, Msg) ->
|
||||||
call(Pid, Msg, infinity).
|
call(Pid, Msg, infinity).
|
||||||
|
@ -172,13 +184,19 @@ handle_call(Request, _From, State) ->
|
||||||
?LOG(error, "adapter unexpected call ~p", [Request]),
|
?LOG(error, "adapter unexpected call ~p", [Request]),
|
||||||
{reply, ignored, State, hibernate}.
|
{reply, ignored, State, hibernate}.
|
||||||
|
|
||||||
|
handle_cast({received_puback, Msg}, State) ->
|
||||||
|
%% NOTE: the counter named 'messages.acked', but the hook named 'message.acked'!
|
||||||
|
ok = emqx_metrics:inc('messages.acked'),
|
||||||
|
_ = emqx_hooks:run('message.acked', [conninfo(State), Msg]),
|
||||||
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
|
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
|
||||||
{noreply, State, hibernate}.
|
{noreply, State, hibernate}.
|
||||||
|
|
||||||
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
|
handle_info({deliver, _Topic, #message{} = Msg},
|
||||||
State = #state{sub_topics = Subscribers}) ->
|
State = #state{sub_topics = Subscribers}) ->
|
||||||
deliver([{Topic, Payload}], Subscribers),
|
deliver([Msg], Subscribers),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(check_alive, State = #state{sub_topics = []}) ->
|
handle_info(check_alive, State = #state{sub_topics = []}) ->
|
||||||
|
@ -271,27 +289,25 @@ packet_to_message(Topic, Payload,
|
||||||
%% Deliver
|
%% Deliver
|
||||||
|
|
||||||
deliver([], _) -> ok;
|
deliver([], _) -> ok;
|
||||||
deliver([Pub | More], Subscribers) ->
|
deliver([Msg | More], Subscribers) ->
|
||||||
ok = do_deliver(Pub, Subscribers),
|
ok = do_deliver(Msg, Subscribers),
|
||||||
deliver(More, Subscribers).
|
deliver(More, Subscribers).
|
||||||
|
|
||||||
do_deliver({Topic, Payload}, Subscribers) ->
|
do_deliver(Msg, Subscribers) ->
|
||||||
%% handle PUBLISH packet from broker
|
%% handle PUBLISH packet from broker
|
||||||
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
|
?LOG(debug, "deliver message from broker, msg: ~p", [Msg]),
|
||||||
deliver_to_coap(Topic, Payload, Subscribers),
|
deliver_to_coap(Msg, Subscribers),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
deliver_to_coap(_TopicName, _Payload, []) ->
|
deliver_to_coap(_Msg, []) ->
|
||||||
ok;
|
ok;
|
||||||
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
|
deliver_to_coap(#message{topic = TopicName} = Msg, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
|
||||||
Matched = case IsWild of
|
Matched = case IsWild of
|
||||||
true -> emqx_topic:match(TopicName, TopicFilter);
|
true -> emqx_topic:match(TopicName, TopicFilter);
|
||||||
false -> TopicName =:= TopicFilter
|
false -> TopicName =:= TopicFilter
|
||||||
end,
|
end,
|
||||||
%?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
|
Matched andalso (CoapPid ! {dispatch, Msg}),
|
||||||
% [Matched, CoapPid, TopicName, Payload, T]),
|
deliver_to_coap(Msg, T).
|
||||||
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
|
|
||||||
deliver_to_coap(TopicName, Payload, T).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper funcs
|
%% Helper funcs
|
||||||
|
@ -328,12 +344,13 @@ chann_info(State) ->
|
||||||
will_msg => undefined
|
will_msg => undefined
|
||||||
}.
|
}.
|
||||||
|
|
||||||
conninfo(#state{peername = Peername,
|
conninfo(#state{peername = {PeerHost, _} = Peername,
|
||||||
clientid = ClientId,
|
clientid = ClientId,
|
||||||
connected_at = ConnectedAt}) ->
|
connected_at = ConnectedAt}) ->
|
||||||
#{socktype => udp,
|
#{socktype => udp,
|
||||||
sockname => {{127, 0, 0, 1}, 5683},
|
sockname => {{127, 0, 0, 1}, 5683},
|
||||||
peername => Peername,
|
peername => Peername,
|
||||||
|
peerhost => PeerHost,
|
||||||
peercert => nossl, %% TODO: dtls
|
peercert => nossl, %% TODO: dtls
|
||||||
conn_mod => ?MODULE,
|
conn_mod => ?MODULE,
|
||||||
proto_name => <<"CoAP">>,
|
proto_name => <<"CoAP">>,
|
||||||
|
|
|
@ -138,16 +138,32 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
handle_info({dispatch, Topic, Payload}, State) ->
|
handle_info({dispatch, Topic, Payload}, State) ->
|
||||||
|
%% This clause should never be matched any more. We keep it here to handle
|
||||||
|
%% the old format messages during the release upgrade.
|
||||||
|
%% In this case the second function clause of `coap_ack/2` will be called,
|
||||||
|
%% and the ACKs is discarded.
|
||||||
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
|
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
|
||||||
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
|
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
|
||||||
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
|
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
|
||||||
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
||||||
|
handle_info({dispatch, Msg}, State) ->
|
||||||
|
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
|
||||||
|
Topic = emqx_coap_mqtt_adapter:message_topic(Msg),
|
||||||
|
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
|
||||||
|
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
|
||||||
|
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
||||||
handle_info(Message, State) ->
|
handle_info(Message, State) ->
|
||||||
?LOG(error, "Unknown Message ~p", [Message]),
|
?LOG(error, "Unknown Message ~p", [Message]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
coap_ack(_Ref, State) -> {ok, State}.
|
coap_ack({pub, Msg}, State) ->
|
||||||
|
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
|
||||||
|
Pid = get(mqtt_client_pid),
|
||||||
|
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
|
||||||
|
{ok, State};
|
||||||
|
coap_ack(_Ref, State) ->
|
||||||
|
?LOG(debug, "received coap ack: ~p", [_Ref]),
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
|
|
|
@ -104,12 +104,26 @@ coap_unobserve({state, ChId, Prefix, Topic}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
handle_info({dispatch, Topic, Payload}, State) ->
|
handle_info({dispatch, Topic, Payload}, State) ->
|
||||||
|
%% This clause should never be matched any more. We keep it here to handle
|
||||||
|
%% the old format messages during the release upgrade.
|
||||||
|
%% In this case the second function clause of `coap_ack/2` will be called,
|
||||||
|
%% and the ACKs is discarded.
|
||||||
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
|
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
|
||||||
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
||||||
|
handle_info({dispatch, Msg}, State) ->
|
||||||
|
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
|
||||||
|
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
|
||||||
handle_info(Message, State) ->
|
handle_info(Message, State) ->
|
||||||
emqx_coap_mqtt_adapter:handle_info(Message, State).
|
emqx_coap_mqtt_adapter:handle_info(Message, State).
|
||||||
|
|
||||||
coap_ack(_Ref, State) -> {ok, State}.
|
coap_ack({pub, Msg}, State) ->
|
||||||
|
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
|
||||||
|
Pid = get(mqtt_client_pid),
|
||||||
|
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
|
||||||
|
{ok, State};
|
||||||
|
coap_ack(_Ref, State) ->
|
||||||
|
?LOG(debug, "received coap ack: ~p", [_Ref]),
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
get_auth(Query) ->
|
get_auth(Query) ->
|
||||||
get_auth(Query, #coap_mqtt_auth{}).
|
get_auth(Query, #coap_mqtt_auth{}).
|
||||||
|
|
|
@ -91,7 +91,7 @@ t_observe(_Config) ->
|
||||||
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
||||||
Payload = <<"123">>,
|
Payload = <<"123">>,
|
||||||
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
||||||
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
|
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
|
||||||
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
|
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
|
||||||
|
|
||||||
[SubPid] = emqx:subscribers(Topic),
|
[SubPid] = emqx:subscribers(Topic),
|
||||||
|
@ -195,12 +195,16 @@ t_one_clientid_sub_2_topics(_Config) ->
|
||||||
[SubPid] = emqx:subscribers(Topic2),
|
[SubPid] = emqx:subscribers(Topic2),
|
||||||
?assert(is_pid(SubPid)),
|
?assert(is_pid(SubPid)),
|
||||||
|
|
||||||
|
CntrAcked1 = emqx_metrics:val('messages.acked'),
|
||||||
emqx:publish(emqx_message:make(Topic1, Payload1)),
|
emqx:publish(emqx_message:make(Topic1, Payload1)),
|
||||||
|
|
||||||
Notif1 = receive_notification(),
|
Notif1 = receive_notification(),
|
||||||
?LOGT("observer 1 get Notif=~p", [Notif1]),
|
?LOGT("observer 1 get Notif=~p", [Notif1]),
|
||||||
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
|
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
|
||||||
?assertEqual(Payload1, PayloadRecv1),
|
?assertEqual(Payload1, PayloadRecv1),
|
||||||
|
timer:sleep(100),
|
||||||
|
CntrAcked2 = emqx_metrics:val('messages.acked'),
|
||||||
|
?assertEqual(CntrAcked2, CntrAcked1 + 1),
|
||||||
|
|
||||||
emqx:publish(emqx_message:make(Topic2, Payload2)),
|
emqx:publish(emqx_message:make(Topic2, Payload2)),
|
||||||
|
|
||||||
|
@ -208,6 +212,9 @@ t_one_clientid_sub_2_topics(_Config) ->
|
||||||
?LOGT("observer 2 get Notif=~p", [Notif2]),
|
?LOGT("observer 2 get Notif=~p", [Notif2]),
|
||||||
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
|
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
|
||||||
?assertEqual(Payload2, PayloadRecv2),
|
?assertEqual(Payload2, PayloadRecv2),
|
||||||
|
timer:sleep(100),
|
||||||
|
CntrAcked3 = emqx_metrics:val('messages.acked'),
|
||||||
|
?assertEqual(CntrAcked3, CntrAcked2 + 1),
|
||||||
|
|
||||||
er_coap_observer:stop(Pid1),
|
er_coap_observer:stop(Pid1),
|
||||||
er_coap_observer:stop(Pid2).
|
er_coap_observer:stop(Pid2).
|
||||||
|
|
|
@ -20,6 +20,16 @@ management.default_application.id = admin
|
||||||
## Value: String
|
## Value: String
|
||||||
management.default_application.secret = public
|
management.default_application.secret = public
|
||||||
|
|
||||||
|
## Initialize apps file
|
||||||
|
## Is used to add administrative app/secrets when EMQX is launched for the first time.
|
||||||
|
## This config will not take any effect once EMQX database is populated with the provided apps.
|
||||||
|
## The file content format is as below:
|
||||||
|
## ```
|
||||||
|
##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI
|
||||||
|
##bb5a6cf1c06a:WuNRRgcRTGiNcuyrE49Bpwz4PGPrRnP4hUMi647kNSbN
|
||||||
|
## ```
|
||||||
|
# management.bootstrap_apps_file = {{ platform_etc_dir }}/bootstrap_apps.txt
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## HTTP Listener
|
## HTTP Listener
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,11 @@
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "management.bootstrap_apps_file", "emqx_management.bootstrap_apps_file", [
|
||||||
|
{datatype, string},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "management.default_application.id", "emqx_management.default_application_id", [
|
{mapping, "management.default_application.id", "emqx_management.default_application_id", [
|
||||||
{default, undefined},
|
{default, undefined},
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
|
|
|
@ -25,11 +25,16 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
{ok, Sup} = emqx_mgmt_sup:start_link(),
|
case emqx_mgmt_auth:init_bootstrap_apps() of
|
||||||
_ = emqx_mgmt_auth:add_default_app(),
|
ok ->
|
||||||
emqx_mgmt_http:start_listeners(),
|
{ok, Sup} = emqx_mgmt_sup:start_link(),
|
||||||
emqx_mgmt_cli:load(),
|
_ = emqx_mgmt_auth:add_default_app(),
|
||||||
{ok, Sup}.
|
emqx_mgmt_http:start_listeners(),
|
||||||
|
emqx_mgmt_cli:load(),
|
||||||
|
{ok, Sup};
|
||||||
|
{error, _Reason} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
emqx_mgmt_http:stop_listeners().
|
emqx_mgmt_http:stop_listeners().
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
, update_app/5
|
, update_app/5
|
||||||
, del_app/1
|
, del_app/1
|
||||||
, list_apps/0
|
, list_apps/0
|
||||||
|
, init_bootstrap_apps/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% APP Auth/ACL API
|
%% APP Auth/ACL API
|
||||||
|
@ -44,6 +45,8 @@
|
||||||
|
|
||||||
-record(mqtt_app, {id, secret, name, desc, status, expired}).
|
-record(mqtt_app, {id, secret, name, desc, status, expired}).
|
||||||
|
|
||||||
|
-define(BOOTSTRAP_TAG, <<"Bootstrapped From File">>).
|
||||||
|
|
||||||
-type(appid() :: binary()).
|
-type(appid() :: binary()).
|
||||||
|
|
||||||
-type(appsecret() :: binary()).
|
-type(appsecret() :: binary()).
|
||||||
|
@ -77,6 +80,68 @@ add_default_app() ->
|
||||||
add_app(AppId1, <<"Default">>, AppSecret1, <<"Application user">>, true, undefined)
|
add_app(AppId1, <<"Default">>, AppSecret1, <<"Application user">>, true, undefined)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
init_bootstrap_apps() ->
|
||||||
|
Bootstrap = application:get_env(emqx_management, bootstrap_apps_file, undefined),
|
||||||
|
Size = mnesia:table_info(mqtt_app, size),
|
||||||
|
init_bootstrap_apps(Bootstrap, Size).
|
||||||
|
|
||||||
|
init_bootstrap_apps(undefined, _) -> ok;
|
||||||
|
init_bootstrap_apps(_File, Size)when Size > 0 -> ok;
|
||||||
|
init_bootstrap_apps(File, 0) ->
|
||||||
|
case file:open(File, [read, binary]) of
|
||||||
|
{ok, Dev} ->
|
||||||
|
{ok, MP} = re:compile(<<"(\.+):(\.+$)">>, [ungreedy]),
|
||||||
|
case init_bootstrap_apps(File, Dev, MP) of
|
||||||
|
ok -> ok;
|
||||||
|
Error ->
|
||||||
|
%% if failed add bootstrap users, we should clear all bootstrap apps
|
||||||
|
{atomic, ok} = mnesia:clear_table(mqtt_app),
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
{error, Reason} = Error ->
|
||||||
|
?LOG(error,
|
||||||
|
"failed to open the mgmt bootstrap apps file(~s) for ~p",
|
||||||
|
[File, Reason]
|
||||||
|
),
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
init_bootstrap_apps(File, Dev, MP) ->
|
||||||
|
try
|
||||||
|
add_bootstrap_app(File, Dev, MP, 1)
|
||||||
|
catch
|
||||||
|
throw:Error -> {error, Error};
|
||||||
|
Type:Reason:Stacktrace ->
|
||||||
|
{error, {Type, Reason, Stacktrace}}
|
||||||
|
after
|
||||||
|
file:close(Dev)
|
||||||
|
end.
|
||||||
|
|
||||||
|
add_bootstrap_app(File, Dev, MP, Line) ->
|
||||||
|
case file:read_line(Dev) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
case re:run(Bin, MP, [global, {capture, all_but_first, binary}]) of
|
||||||
|
{match, [[AppId, AppSecret]]} ->
|
||||||
|
Name = <<"bootstraped">>,
|
||||||
|
case add_app(AppId, Name, AppSecret, ?BOOTSTRAP_TAG, true, undefined) of
|
||||||
|
{ok, _} ->
|
||||||
|
add_bootstrap_app(File, Dev, MP, Line + 1);
|
||||||
|
{error, Reason} ->
|
||||||
|
throw(#{file => File, line => Line, content => Bin, reason => Reason})
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
?LOG(error,
|
||||||
|
"failed to bootstrap apps file(~s) for Line(~w): ~ts",
|
||||||
|
[File, Line, Bin]
|
||||||
|
),
|
||||||
|
throw(#{file => File, line => Line, content => Bin, reason => "invalid format"})
|
||||||
|
end;
|
||||||
|
eof ->
|
||||||
|
ok;
|
||||||
|
{error, Error} ->
|
||||||
|
throw(#{file => File, line => Line, reason => Error})
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(add_app(appid(), binary()) -> {ok, appsecret()} | {error, term()}).
|
-spec(add_app(appid(), binary()) -> {ok, appsecret()} | {error, term()}).
|
||||||
add_app(AppId, Name) when is_binary(AppId) ->
|
add_app(AppId, Name) when is_binary(AppId) ->
|
||||||
add_app(AppId, Name, <<"Application user">>, true, undefined).
|
add_app(AppId, Name, <<"Application user">>, true, undefined).
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_mgmt_bootstrap_app_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
application:load(emqx_modules),
|
||||||
|
application:load(emqx_modules_spec),
|
||||||
|
application:load(emqx_management),
|
||||||
|
application:stop(emqx_rule_engine),
|
||||||
|
ekka_mnesia:start(),
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_load_ok(_) ->
|
||||||
|
application:stop(emqx_management),
|
||||||
|
Bin = <<"test-1:secret-1\ntest-2:secret-2">>,
|
||||||
|
File = "./bootstrap_apps.txt",
|
||||||
|
ok = file:write_file(File, Bin),
|
||||||
|
_ = mnesia:clear_table(mqtt_app),
|
||||||
|
application:set_env(emqx_management, bootstrap_apps_file, File),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_management),
|
||||||
|
?assert(emqx_mgmt_auth:is_authorized(<<"test-1">>, <<"secret-1">>)),
|
||||||
|
?assert(emqx_mgmt_auth:is_authorized(<<"test-2">>, <<"secret-2">>)),
|
||||||
|
?assertNot(emqx_mgmt_auth:is_authorized(<<"test-2">>, <<"secret-1">>)),
|
||||||
|
application:stop(emqx_management).
|
||||||
|
|
||||||
|
t_bootstrap_user_file_not_found(_) ->
|
||||||
|
File = "./bootstrap_apps_not_exist.txt",
|
||||||
|
check_load_failed(File),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_load_invalid_username_failed(_) ->
|
||||||
|
Bin = <<"test-1:password-1\ntest&2:password-2">>,
|
||||||
|
File = "./bootstrap_apps.txt",
|
||||||
|
ok = file:write_file(File, Bin),
|
||||||
|
check_load_failed(File),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_load_invalid_format_failed(_) ->
|
||||||
|
Bin = <<"test-1:password-1\ntest-2password-2">>,
|
||||||
|
File = "./bootstrap_apps.txt",
|
||||||
|
ok = file:write_file(File, Bin),
|
||||||
|
check_load_failed(File),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
check_load_failed(File) ->
|
||||||
|
_ = mnesia:clear_table(mqtt_app),
|
||||||
|
application:stop(emqx_management),
|
||||||
|
application:set_env(emqx_management, bootstrap_apps_file, File),
|
||||||
|
?assertMatch({error, _}, application:ensure_all_started(emqx_management)),
|
||||||
|
?assertNot(lists:member(emqx_management, application:which_applications())),
|
||||||
|
?assertEqual(0, mnesia:table_info(mqtt_app, size)).
|
|
@ -2,6 +2,14 @@
|
||||||
|
|
||||||
## Enhancements
|
## Enhancements
|
||||||
|
|
||||||
|
|
||||||
|
- Remove useless information from the dashboard listener failure log [#9260](https://github.com/emqx/emqx/pull/9260).
|
||||||
|
|
||||||
|
- We now trigger the `'message.acked'` hook after the CoAP gateway sends a message to the device and receives the ACK from the device [#9264](https://github.com/emqx/emqx/pull/9264).
|
||||||
|
With this change, the CoAP gateway can be combined with the offline message caching function (in the
|
||||||
|
emqx enterprise), so that CoAP devices are able to read the missed messages from the database when
|
||||||
|
it is online again.
|
||||||
|
|
||||||
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
|
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
|
||||||
|
|
||||||
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
||||||
|
@ -17,6 +25,8 @@
|
||||||
|
|
||||||
- Enhanced log security in ACL modules, sensitive data will be obscured. [#9242](https://github.com/emqx/emqx/pull/9242).
|
- Enhanced log security in ACL modules, sensitive data will be obscured. [#9242](https://github.com/emqx/emqx/pull/9242).
|
||||||
|
|
||||||
|
- Add `management.bootstrap_apps_file` configuration to bulk import default app/secret when EMQX initializes the database [#9273](https://github.com/emqx/emqx/pull/9273).
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||||
|
|
|
@ -2,6 +2,11 @@
|
||||||
|
|
||||||
## 增强
|
## 增强
|
||||||
|
|
||||||
|
- 删除 Dashboard 监听器失败时日志中的无用信息 [#9260](https://github.com/emqx/emqx/pull/9260).
|
||||||
|
|
||||||
|
- 当 CoAP 网关给设备投递消息并收到设备发来的确认之后,回调 `'message.acked'` 钩子 [#9264](https://github.com/emqx/emqx/pull/9264)。
|
||||||
|
有了这个改动,CoAP 网关可以配合 EMQX (企业版)的离线消息缓存功能,让 CoAP 设备重新上线之后,从数据库读取其离线状态下错过的消息。
|
||||||
|
|
||||||
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
|
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
|
||||||
|
|
||||||
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
||||||
|
@ -17,6 +22,8 @@
|
||||||
|
|
||||||
- 增强 ACL 模块中的日志安全性,敏感数据将被模糊化。[#9242](https://github.com/emqx/emqx/pull/9242)。
|
- 增强 ACL 模块中的日志安全性,敏感数据将被模糊化。[#9242](https://github.com/emqx/emqx/pull/9242)。
|
||||||
|
|
||||||
|
- 增加 `management.bootstrap_apps_file` 配置,可以让 EMQX 初始化数据库时,从该文件批量导入一些 APP / Secret [#9273](https://github.com/emqx/emqx/pull/9273)。
|
||||||
|
|
||||||
## 修复
|
## 修复
|
||||||
|
|
||||||
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||||
|
|
|
@ -54,7 +54,7 @@ groups() ->
|
||||||
{overview, [sequence], [t_overview]},
|
{overview, [sequence], [t_overview]},
|
||||||
{admins, [sequence], [t_admins_add_delete, t_admins_persist_default_password, t_default_password_persists_after_leaving_cluster]},
|
{admins, [sequence], [t_admins_add_delete, t_admins_persist_default_password, t_default_password_persists_after_leaving_cluster]},
|
||||||
{rest, [sequence], [t_rest_api]},
|
{rest, [sequence], [t_rest_api]},
|
||||||
{cli, [sequence], [t_cli]}
|
{cli, [sequence], [t_cli, t_start_listener_failed_log]}
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -237,6 +237,21 @@ t_cli(_Config) ->
|
||||||
AdminList = emqx_dashboard_admin:all_users(),
|
AdminList = emqx_dashboard_admin:all_users(),
|
||||||
?assertEqual(2, length(AdminList)).
|
?assertEqual(2, length(AdminList)).
|
||||||
|
|
||||||
|
t_start_listener_failed_log({init, Config}) ->
|
||||||
|
_ = application:stop(emqx_dashboard),
|
||||||
|
Config;
|
||||||
|
t_start_listener_failed_log({'end', _Config}) ->
|
||||||
|
_ = application:start(emqx_dashboard),
|
||||||
|
ok;
|
||||||
|
t_start_listener_failed_log(_Config) ->
|
||||||
|
ct:capture_start(),
|
||||||
|
Options = [{num_acceptors,4}, {max_connections,512}, {inet6,false}, {ipv6_v6only,false}],
|
||||||
|
?assertError(_, emqx_dashboard:start_listener({http, {"1.1.1.1", 8080}, Options})),
|
||||||
|
ct:capture_stop(),
|
||||||
|
I0 = ct:capture_get(),
|
||||||
|
?assertMatch({match, _}, re:run(iolist_to_binary(I0), "eaddrnotavail", [])),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -50,7 +50,7 @@
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
|
||||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.9"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.10"}}}
|
||||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
|
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||||
|
|
Loading…
Reference in New Issue