diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 9828ed383..a1e062add 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -74,7 +74,7 @@ sockstate :: emqx_types:sockstate(), sockname :: {inet:ip_address(), inet:port()}, peername :: {inet:ip_address(), inet:port()}, - channel :: emqx_channel:channel(), + channel :: maybe(emqx_channel:channel()), registry :: emqx_sn_registry:registry(), clientid :: maybe(binary()), username :: maybe(binary()), @@ -105,6 +105,8 @@ -define(NO_PEERCERT, undefined). +%% TODO: fix when https://github.com/emqx/emqx-sn/pull/170 is merged +-dialyzer([{nowarn_function, [idle/3]}]). %%-------------------------------------------------------------------- %% Exported APIs @@ -136,7 +138,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> - Channel = emqx_channel:init(#{sockname => Sockname, + Channel = emqx_channel:init(#{socktype => udp, + sockname => Sockname, peername => Peername, protocol => 'mqtt-sn', peercert => ?NO_PEERCERT, @@ -195,14 +198,18 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType }, TopicId, _MsgId, Data)}, - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{clientid = ClientId, registry = Registry}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> - emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId); + false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId); true -> <> end, - Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data), - (TopicName =/= undefined) andalso emqx_broker:publish(Msg), + case TopicName =/= undefined of + true -> + Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data), + emqx_broker:publish(Msg); + false -> + ok + end; ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State), {keep_state_and_data, State#state.idle_timeout}; @@ -562,10 +569,9 @@ terminate(Reason, _StateName, #state{clientid = ClientId, channel = Channel, registry = Registry}) -> emqx_sn_registry:unregister_topic(Registry, ClientId), - case {Channel, Reason} of - {undefined, _} -> ok; - {_, _} -> - emqx_channel:terminate(Reason, Channel) + case Channel =:= undefined of + true -> ok; + false -> emqx_channel:terminate(Reason, Channel) end. code_change(_Vsn, StateName, State, _Extra) -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c1d3822cb..dd1bd71a9 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -148,7 +148,8 @@ -type(username() :: maybe(binary())). -type(password() :: maybe(binary())). -type(peerhost() :: inet:ip_address()). --type(peername() :: {inet:ip_address(), inet:port_number()}). +-type(peername() :: {inet:ip_address(), inet:port_number()} + | inet:returned_non_ip_address()). -type(protocol() :: mqtt | 'mqtt-sn' | coap | lwm2m | stomp | none | atom()). -type(auth_result() :: success | client_identifier_not_valid