Umbrella fix build (#6)
* fix(emqx_bridge_rpc): Dialyzer warnings * fix(emqx_coap_app): Dialyzer warnings * fix(emqx_coap_mqtt_adapter): Dialyzer warnings * fix(emqx_lwm2m_json): Dialyzer warnings * fix(emqx_lwm2m_message): Dialyzer warnings * fix(emqx_mgmt): Dialyzer warnings * fix(emqx_mgmt_cli): Dialyzer warnings * fix(emqx_mgmt): Dialyzer warnings * fix(emqx_bridge_rpc): Dialyzer warnings * fix(emqx_mgmt): Dialyzer warnings * fix(emqx_exproto_gcli): Dialyzer warnings
This commit is contained in:
parent
13b67c0d19
commit
0cb9cbce71
|
|
@ -33,6 +33,7 @@
|
||||||
|
|
||||||
-type ack_ref() :: emqx_bridge_worker:ack_ref().
|
-type ack_ref() :: emqx_bridge_worker:ack_ref().
|
||||||
-type batch() :: emqx_bridge_worker:batch().
|
-type batch() :: emqx_bridge_worker:batch().
|
||||||
|
-type node_or_tuple() :: atom() | {atom(), term()}.
|
||||||
|
|
||||||
-define(HEARTBEAT_INTERVAL, timer:seconds(1)).
|
-define(HEARTBEAT_INTERVAL, timer:seconds(1)).
|
||||||
|
|
||||||
|
|
@ -61,7 +62,7 @@ stop(#{client_pid := Pid}) when is_pid(Pid) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @doc Callback for `emqx_bridge_connect' behaviour
|
%% @doc Callback for `emqx_bridge_connect' behaviour
|
||||||
-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
|
-spec send(#{address:=node_or_tuple(), _=>_}, batch()) -> {ok, ack_ref()} | {error, any()}.
|
||||||
send(#{address := Remote}, Batch) ->
|
send(#{address := Remote}, Batch) ->
|
||||||
case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of
|
case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ start(_Type, _Args) ->
|
||||||
{ok, Sup} = emqx_coap_sup:start_link(),
|
{ok, Sup} = emqx_coap_sup:start_link(),
|
||||||
coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
|
coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
|
||||||
coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
|
coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
|
||||||
emqx_coap_ps_topics:start_link(),
|
_ = emqx_coap_ps_topics:start_link(),
|
||||||
emqx_coap_server:start(application:get_all_env(?APP)),
|
emqx_coap_server:start(application:get_all_env(?APP)),
|
||||||
{ok,Sup}.
|
{ok,Sup}.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top
|
||||||
{reply, ok, State#state{sub_topics = NewTopics}, hibernate};
|
{reply, ok, State#state{sub_topics = NewTopics}, hibernate};
|
||||||
|
|
||||||
handle_call({publish, Topic, Payload}, _From, State) ->
|
handle_call({publish, Topic, Payload}, _From, State) ->
|
||||||
chann_publish(Topic, Payload, State),
|
_ = chann_publish(Topic, Payload, State),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(info, _From, State) ->
|
handle_call(info, _From, State) ->
|
||||||
|
|
@ -233,10 +233,6 @@ do_deliver({Topic, Payload}, Subscribers) ->
|
||||||
%% handle PUBLISH packet from broker
|
%% handle PUBLISH packet from broker
|
||||||
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
|
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
|
||||||
deliver_to_coap(Topic, Payload, Subscribers),
|
deliver_to_coap(Topic, Payload, Subscribers),
|
||||||
ok;
|
|
||||||
|
|
||||||
do_deliver(Pkt, _Subscribers) ->
|
|
||||||
?LOG(warning, "unknown packet type to deliver, pkt=~p,", [Pkt]),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
deliver_to_coap(_TopicName, _Payload, []) ->
|
deliver_to_coap(_TopicName, _Payload, []) ->
|
||||||
|
|
|
||||||
|
|
@ -74,22 +74,24 @@ handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
handle_cast({rpc, Fun, Req, Options, From}, State) ->
|
handle_cast({rpc, Fun, Req, Options, From}, State) ->
|
||||||
case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
|
try
|
||||||
{ok, Resp, _Metadata} ->
|
case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
|
||||||
?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
|
{ok, Resp, _Metadata} ->
|
||||||
reply(From, Fun, {ok, Resp});
|
?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
|
||||||
{error, {Code, Msg}, _Metadata} ->
|
reply(From, Fun, {ok, Resp});
|
||||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
|
{error, {Code, Msg}, _Metadata} ->
|
||||||
|
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
|
||||||
[?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]),
|
[?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]),
|
||||||
reply(From, Fun, {error, {Code, Msg}});
|
reply(From, Fun, {error, {Code, Msg}});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
|
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
|
||||||
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
|
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
|
||||||
reply(From, Fun, {error, Reason});
|
reply(From, Fun, {error, Reason})
|
||||||
{'EXIT', {Reason, Stk}} ->
|
end
|
||||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
|
catch _ : Rsn : Stk ->
|
||||||
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]),
|
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
|
||||||
reply(From, Fun, {error, Reason})
|
[?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]),
|
||||||
|
reply(From, Fun, {error, Rsn})
|
||||||
end,
|
end,
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
@ -107,4 +109,5 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
reply(Pid, Fun, Result) ->
|
reply(Pid, Fun, Result) ->
|
||||||
Pid ! {hreply, Fun, Result}.
|
Pid ! {hreply, Fun, Result},
|
||||||
|
ok.
|
||||||
|
|
|
||||||
|
|
@ -251,7 +251,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
|
||||||
?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]),
|
?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]),
|
||||||
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
|
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
|
||||||
case find_resource(ResourceId, Acc) of
|
case find_resource(ResourceId, Acc) of
|
||||||
undeinfed ->
|
undefined ->
|
||||||
Acc ++ [NewMap];
|
Acc ++ [NewMap];
|
||||||
Resource ->
|
Resource ->
|
||||||
Acc2 = lists:delete(Resource, Acc),
|
Acc2 = lists:delete(Resource, Acc),
|
||||||
|
|
@ -262,7 +262,7 @@ insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) ->
|
||||||
?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]),
|
?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]),
|
||||||
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
|
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
|
||||||
case find_resource_instance(ResourceInstanceId, Acc) of
|
case find_resource_instance(ResourceInstanceId, Acc) of
|
||||||
undeinfed ->
|
undefined ->
|
||||||
Acc ++ [NewMap];
|
Acc ++ [NewMap];
|
||||||
Resource ->
|
Resource ->
|
||||||
Acc2 = lists:delete(Resource, Acc),
|
Acc2 = lists:delete(Resource, Acc),
|
||||||
|
|
|
||||||
|
|
@ -229,7 +229,7 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac
|
||||||
insert_resource_into_object_instance([ResourceId], Value, Acc) ->
|
insert_resource_into_object_instance([ResourceId], Value, Acc) ->
|
||||||
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
|
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
|
||||||
case find_resource(ResourceId, Acc) of
|
case find_resource(ResourceId, Acc) of
|
||||||
undeinfed ->
|
undefined ->
|
||||||
Acc ++ [NewMap];
|
Acc ++ [NewMap];
|
||||||
Resource ->
|
Resource ->
|
||||||
Acc2 = lists:delete(Resource, Acc),
|
Acc2 = lists:delete(Resource, Acc),
|
||||||
|
|
@ -239,7 +239,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
|
||||||
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) ->
|
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) ->
|
||||||
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
|
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
|
||||||
case find_resource_instance(ResourceInstanceId, Acc) of
|
case find_resource_instance(ResourceInstanceId, Acc) of
|
||||||
undeinfed ->
|
undefined ->
|
||||||
Acc ++ [NewMap];
|
Acc ++ [NewMap];
|
||||||
Resource ->
|
Resource ->
|
||||||
Acc2 = lists:delete(Resource, Acc),
|
Acc2 = lists:delete(Resource, Acc),
|
||||||
|
|
|
||||||
|
|
@ -585,7 +585,7 @@ delete_all_deactivated_alarms() ->
|
||||||
|
|
||||||
delete_all_deactivated_alarms(Node) when Node =:= node() ->
|
delete_all_deactivated_alarms(Node) when Node =:= node() ->
|
||||||
emqx_alarm:delete_all_deactivated_alarms();
|
emqx_alarm:delete_all_deactivated_alarms();
|
||||||
delete_all_deactivated_alarms(Node) ->
|
delete_all_deactivated_alarms(Node) ->
|
||||||
rpc_call(Node, delete_deactivated_alarms, [Node]).
|
rpc_call(Node, delete_deactivated_alarms, [Node]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
@ -664,7 +664,7 @@ export_auth_username() ->
|
||||||
export_auth_mnesia() ->
|
export_auth_mnesia() ->
|
||||||
case ets:info(emqx_user) of
|
case ets:info(emqx_user) of
|
||||||
undefined -> [];
|
undefined -> [];
|
||||||
_ ->
|
_ ->
|
||||||
lists:foldl(fun({_, Login, Password, IsSuperuser}, Acc) ->
|
lists:foldl(fun({_, Login, Password, IsSuperuser}, Acc) ->
|
||||||
[[{login, Login}, {password, Password}, {is_superuser, IsSuperuser}] | Acc]
|
[[{login, Login}, {password, Password}, {is_superuser, IsSuperuser}] | Acc]
|
||||||
end, [], ets:tab2list(emqx_user))
|
end, [], ets:tab2list(emqx_user))
|
||||||
|
|
@ -764,7 +764,7 @@ import_auth_clientid(Lists) ->
|
||||||
case ets:info(emqx_auth_clientid) of
|
case ets:info(emqx_auth_clientid) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_ ->
|
_ ->
|
||||||
[ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId,
|
[ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId,
|
||||||
<<"password">> := Password} <- Lists ]
|
<<"password">> := Password} <- Lists ]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
@ -772,14 +772,14 @@ import_auth_username(Lists) ->
|
||||||
case ets:info(emqx_auth_username) of
|
case ets:info(emqx_auth_username) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_ ->
|
_ ->
|
||||||
[ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username,
|
[ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username,
|
||||||
<<"password">> := Password} <- Lists ]
|
<<"password">> := Password} <- Lists ]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
import_auth_mnesia(Auths) ->
|
import_auth_mnesia(Auths) ->
|
||||||
case ets:info(emqx_user) of
|
case ets:info(emqx_user) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_ ->
|
_ ->
|
||||||
[ mnesia:dirty_write({emqx_user, Login, Password, IsSuperuser}) || #{<<"login">> := Login,
|
[ mnesia:dirty_write({emqx_user, Login, Password, IsSuperuser}) || #{<<"login">> := Login,
|
||||||
<<"password">> := Password,
|
<<"password">> := Password,
|
||||||
<<"is_superuser">> := IsSuperuser} <- Auths ]
|
<<"is_superuser">> := IsSuperuser} <- Auths ]
|
||||||
|
|
@ -788,14 +788,14 @@ import_auth_mnesia(Auths) ->
|
||||||
import_acl_mnesia(Acls) ->
|
import_acl_mnesia(Acls) ->
|
||||||
case ets:info(emqx_acl) of
|
case ets:info(emqx_acl) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_ ->
|
_ ->
|
||||||
[ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login,
|
[ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login,
|
||||||
<<"topic">> := Topic,
|
<<"topic">> := Topic,
|
||||||
<<"action">> := Action,
|
<<"action">> := Action,
|
||||||
<<"allow">> := Allow} <- Acls ]
|
<<"allow">> := Allow} <- Acls ]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
import_schemas(Schemas) ->
|
import_schemas(Schemas) ->
|
||||||
case ets:info(emqx_schema) of
|
case ets:info(emqx_schema) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
|
_ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
|
||||||
|
|
@ -817,7 +817,7 @@ to_version(Version) when is_list(Version) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
enable_telemetry() ->
|
enable_telemetry() ->
|
||||||
[enable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok.
|
lists:foreach(fun enable_telemetry/1,ekka_mnesia:running_nodes()).
|
||||||
|
|
||||||
enable_telemetry(Node) when Node =:= node() ->
|
enable_telemetry(Node) when Node =:= node() ->
|
||||||
emqx_telemetry:enable();
|
emqx_telemetry:enable();
|
||||||
|
|
@ -825,7 +825,7 @@ enable_telemetry(Node) ->
|
||||||
rpc_call(Node, enable_telemetry, [Node]).
|
rpc_call(Node, enable_telemetry, [Node]).
|
||||||
|
|
||||||
disable_telemetry() ->
|
disable_telemetry() ->
|
||||||
[disable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok.
|
lists:foreach(fun disable_telemetry/1,ekka_mnesia:running_nodes()).
|
||||||
|
|
||||||
disable_telemetry(Node) when Node =:= node() ->
|
disable_telemetry(Node) when Node =:= node() ->
|
||||||
emqx_telemetry:disable();
|
emqx_telemetry:disable();
|
||||||
|
|
|
||||||
|
|
@ -416,7 +416,7 @@ log(["primary-level"]) ->
|
||||||
emqx_ctl:print("~s~n", [Level]);
|
emqx_ctl:print("~s~n", [Level]);
|
||||||
|
|
||||||
log(["primary-level", Level]) ->
|
log(["primary-level", Level]) ->
|
||||||
emqx_logger:set_primary_log_level(list_to_atom(Level)),
|
_ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
|
||||||
emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]);
|
emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]);
|
||||||
|
|
||||||
log(["handlers", "list"]) ->
|
log(["handlers", "list"]) ->
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue