diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl index 92ef0c60a..73b0df8d3 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl @@ -33,6 +33,7 @@ -type ack_ref() :: emqx_bridge_worker:ack_ref(). -type batch() :: emqx_bridge_worker:batch(). +-type node_or_tuple() :: atom() | {atom(), term()}. -define(HEARTBEAT_INTERVAL, timer:seconds(1)). @@ -61,7 +62,7 @@ stop(#{client_pid := Pid}) when is_pid(Pid) -> ok. %% @doc Callback for `emqx_bridge_connect' behaviour --spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. +-spec send(#{address:=node_or_tuple(), _=>_}, batch()) -> {ok, ack_ref()} | {error, any()}. send(#{address := Remote}, Batch) -> case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of ok -> diff --git a/apps/emqx_coap/src/emqx_coap_app.erl b/apps/emqx_coap/src/emqx_coap_app.erl index eaabd356b..4e7655a74 100644 --- a/apps/emqx_coap/src/emqx_coap_app.erl +++ b/apps/emqx_coap/src/emqx_coap_app.erl @@ -30,7 +30,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_coap_sup:start_link(), coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined), coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined), - emqx_coap_ps_topics:start_link(), + _ = emqx_coap_ps_topics:start_link(), emqx_coap_server:start(application:get_all_env(?APP)), {ok,Sup}. diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index 4c508c272..da0dade5b 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -136,7 +136,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top {reply, ok, State#state{sub_topics = NewTopics}, hibernate}; handle_call({publish, Topic, Payload}, _From, State) -> - chann_publish(Topic, Payload, State), + _ = chann_publish(Topic, Payload, State), {reply, ok, State}; handle_call(info, _From, State) -> @@ -233,10 +233,6 @@ do_deliver({Topic, Payload}, Subscribers) -> %% handle PUBLISH packet from broker ?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]), deliver_to_coap(Topic, Payload, Subscribers), - ok; - -do_deliver(Pkt, _Subscribers) -> - ?LOG(warning, "unknown packet type to deliver, pkt=~p,", [Pkt]), ok. deliver_to_coap(_TopicName, _Payload, []) -> diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index 5ae9b0209..69784b70c 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -74,22 +74,24 @@ handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast({rpc, Fun, Req, Options, From}, State) -> - case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of - {ok, Resp, _Metadata} -> - ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), - reply(From, Fun, {ok, Resp}); - {error, {Code, Msg}, _Metadata} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", + try + case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of + {ok, Resp, _Metadata} -> + ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), + reply(From, Fun, {ok, Resp}); + {error, {Code, Msg}, _Metadata} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", [?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]), - reply(From, Fun, {error, {Code, Msg}}); - {error, Reason} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", + reply(From, Fun, {error, {Code, Msg}}); + {error, Reason} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]), - reply(From, Fun, {error, Reason}); - {'EXIT', {Reason, Stk}} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", - [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]), - reply(From, Fun, {error, Reason}) + reply(From, Fun, {error, Reason}) + end + catch _ : Rsn : Stk -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", + [?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]), + reply(From, Fun, {error, Rsn}) end, {noreply, State}. @@ -107,4 +109,5 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- reply(Pid, Fun, Result) -> - Pid ! {hreply, Fun, Result}. + Pid ! {hreply, Fun, Result}, + ok. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_json.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_json.erl index f63e46b18..77fc0619c 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_json.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_json.erl @@ -251,7 +251,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) -> ?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]), NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, case find_resource(ResourceId, Acc) of - undeinfed -> + undefined -> Acc ++ [NewMap]; Resource -> Acc2 = lists:delete(Resource, Acc), @@ -262,7 +262,7 @@ insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> ?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]), NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, case find_resource_instance(ResourceInstanceId, Acc) of - undeinfed -> + undefined -> Acc ++ [NewMap]; Resource -> Acc2 = lists:delete(Resource, Acc), diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_message.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_message.erl index 7c1d49ad7..e5cc704ef 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_message.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_message.erl @@ -229,7 +229,7 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac insert_resource_into_object_instance([ResourceId], Value, Acc) -> NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, case find_resource(ResourceId, Acc) of - undeinfed -> + undefined -> Acc ++ [NewMap]; Resource -> Acc2 = lists:delete(Resource, Acc), @@ -239,7 +239,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) -> insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, case find_resource_instance(ResourceInstanceId, Acc) of - undeinfed -> + undefined -> Acc ++ [NewMap]; Resource -> Acc2 = lists:delete(Resource, Acc), diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index b5a319278..1098cbf73 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -585,7 +585,7 @@ delete_all_deactivated_alarms() -> delete_all_deactivated_alarms(Node) when Node =:= node() -> emqx_alarm:delete_all_deactivated_alarms(); -delete_all_deactivated_alarms(Node) -> +delete_all_deactivated_alarms(Node) -> rpc_call(Node, delete_deactivated_alarms, [Node]). %%-------------------------------------------------------------------- @@ -664,7 +664,7 @@ export_auth_username() -> export_auth_mnesia() -> case ets:info(emqx_user) of undefined -> []; - _ -> + _ -> lists:foldl(fun({_, Login, Password, IsSuperuser}, Acc) -> [[{login, Login}, {password, Password}, {is_superuser, IsSuperuser}] | Acc] end, [], ets:tab2list(emqx_user)) @@ -764,7 +764,7 @@ import_auth_clientid(Lists) -> case ets:info(emqx_auth_clientid) of undefined -> ok; _ -> - [ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId, + [ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId, <<"password">> := Password} <- Lists ] end. @@ -772,14 +772,14 @@ import_auth_username(Lists) -> case ets:info(emqx_auth_username) of undefined -> ok; _ -> - [ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username, + [ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username, <<"password">> := Password} <- Lists ] end. import_auth_mnesia(Auths) -> case ets:info(emqx_user) of undefined -> ok; - _ -> + _ -> [ mnesia:dirty_write({emqx_user, Login, Password, IsSuperuser}) || #{<<"login">> := Login, <<"password">> := Password, <<"is_superuser">> := IsSuperuser} <- Auths ] @@ -788,14 +788,14 @@ import_auth_mnesia(Auths) -> import_acl_mnesia(Acls) -> case ets:info(emqx_acl) of undefined -> ok; - _ -> - [ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login, + _ -> + [ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login, <<"topic">> := Topic, <<"action">> := Action, <<"allow">> := Allow} <- Acls ] end. -import_schemas(Schemas) -> +import_schemas(Schemas) -> case ets:info(emqx_schema) of undefined -> ok; _ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas] @@ -817,7 +817,7 @@ to_version(Version) when is_list(Version) -> %%-------------------------------------------------------------------- enable_telemetry() -> - [enable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. + lists:foreach(fun enable_telemetry/1,ekka_mnesia:running_nodes()). enable_telemetry(Node) when Node =:= node() -> emqx_telemetry:enable(); @@ -825,7 +825,7 @@ enable_telemetry(Node) -> rpc_call(Node, enable_telemetry, [Node]). disable_telemetry() -> - [disable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. + lists:foreach(fun disable_telemetry/1,ekka_mnesia:running_nodes()). disable_telemetry(Node) when Node =:= node() -> emqx_telemetry:disable(); diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 2ddd8d459..591dfa8b4 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -416,7 +416,7 @@ log(["primary-level"]) -> emqx_ctl:print("~s~n", [Level]); log(["primary-level", Level]) -> - emqx_logger:set_primary_log_level(list_to_atom(Level)), + _ = emqx_logger:set_primary_log_level(list_to_atom(Level)), emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]); log(["handlers", "list"]) ->