feat: sync with master

This commit is contained in:
x1001100011 2021-09-12 15:32:03 -07:00
parent 7a7cccb337
commit 5e812a1add
8 changed files with 29 additions and 17587 deletions

29
apps/emqx_exhook/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~
rebar.lock
data/
*.conf.rendered
*.pyc
.DS_Store
*.class
Mnesia.nonode@nohost/
src/emqx_exhook_pb.erl
src/emqx_exhook_v_1_hook_provider_client.erl
src/emqx_exhook_v_1_hook_provider_bhvr.erl

File diff suppressed because it is too large Load Diff

View File

@ -1,93 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Behaviour to implement for grpc service emqx.exhook.v1.HookProvider.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated and should not be modified manually
-module(emqx_exhook_v_1_hook_provider_bhvr).
-callback on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.

View File

@ -1,445 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Client module for grpc service emqx.exhook.v1.HookProvider.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated and should not be modified manually
-module(emqx_exhook_v_1_hook_provider_client).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("grpc/include/grpc.hrl").
-define(SERVICE, 'emqx.exhook.v1.HookProvider').
-define(PROTO_MODULE, 'emqx_exhook_pb').
-define(MARSHAL(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Path, Req, Resp, MessageType),
#{path => Path,
service =>?SERVICE,
message_type => MessageType,
marshal => ?MARSHAL(Req),
unmarshal => ?UNMARSHAL(Resp)}).
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req) ->
on_provider_loaded(Req, #{}, #{}).
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:options())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req, Options) ->
on_provider_loaded(Req, #{}, Options).
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnProviderLoaded">>,
provider_loaded_request, loaded_response, <<"emqx.exhook.v1.ProviderLoadedRequest">>),
Req, Metadata, Options).
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req) ->
on_provider_unloaded(Req, #{}, #{}).
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req, Options) ->
on_provider_unloaded(Req, #{}, Options).
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnProviderUnloaded">>,
provider_unloaded_request, empty_success, <<"emqx.exhook.v1.ProviderUnloadedRequest">>),
Req, Metadata, Options).
-spec on_client_connect(emqx_exhook_pb:client_connect_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req) ->
on_client_connect(Req, #{}, #{}).
-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req, Options) ->
on_client_connect(Req, #{}, Options).
-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientConnect">>,
client_connect_request, empty_success, <<"emqx.exhook.v1.ClientConnectRequest">>),
Req, Metadata, Options).
-spec on_client_connack(emqx_exhook_pb:client_connack_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req) ->
on_client_connack(Req, #{}, #{}).
-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req, Options) ->
on_client_connack(Req, #{}, Options).
-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientConnack">>,
client_connack_request, empty_success, <<"emqx.exhook.v1.ClientConnackRequest">>),
Req, Metadata, Options).
-spec on_client_connected(emqx_exhook_pb:client_connected_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req) ->
on_client_connected(Req, #{}, #{}).
-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req, Options) ->
on_client_connected(Req, #{}, Options).
-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientConnected">>,
client_connected_request, empty_success, <<"emqx.exhook.v1.ClientConnectedRequest">>),
Req, Metadata, Options).
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req) ->
on_client_disconnected(Req, #{}, #{}).
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req, Options) ->
on_client_disconnected(Req, #{}, Options).
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientDisconnected">>,
client_disconnected_request, empty_success, <<"emqx.exhook.v1.ClientDisconnectedRequest">>),
Req, Metadata, Options).
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req) ->
on_client_authenticate(Req, #{}, #{}).
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req, Options) ->
on_client_authenticate(Req, #{}, Options).
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientAuthenticate">>,
client_authenticate_request, valued_response, <<"emqx.exhook.v1.ClientAuthenticateRequest">>),
Req, Metadata, Options).
-spec on_client_authorize(emqx_exhook_pb:client_authorize_request())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authorize(Req) ->
on_client_authorize(Req, #{}, #{}).
-spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authorize(Req, Options) ->
on_client_authorize(Req, #{}, Options).
-spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authorize(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientAuthorize">>,
client_authorize_request, valued_response, <<"emqx.exhook.v1.ClientAuthorizeRequest">>),
Req, Metadata, Options).
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req) ->
on_client_subscribe(Req, #{}, #{}).
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req, Options) ->
on_client_subscribe(Req, #{}, Options).
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientSubscribe">>,
client_subscribe_request, empty_success, <<"emqx.exhook.v1.ClientSubscribeRequest">>),
Req, Metadata, Options).
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req) ->
on_client_unsubscribe(Req, #{}, #{}).
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req, Options) ->
on_client_unsubscribe(Req, #{}, Options).
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnClientUnsubscribe">>,
client_unsubscribe_request, empty_success, <<"emqx.exhook.v1.ClientUnsubscribeRequest">>),
Req, Metadata, Options).
-spec on_session_created(emqx_exhook_pb:session_created_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req) ->
on_session_created(Req, #{}, #{}).
-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req, Options) ->
on_session_created(Req, #{}, Options).
-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionCreated">>,
session_created_request, empty_success, <<"emqx.exhook.v1.SessionCreatedRequest">>),
Req, Metadata, Options).
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req) ->
on_session_subscribed(Req, #{}, #{}).
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req, Options) ->
on_session_subscribed(Req, #{}, Options).
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionSubscribed">>,
session_subscribed_request, empty_success, <<"emqx.exhook.v1.SessionSubscribedRequest">>),
Req, Metadata, Options).
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req) ->
on_session_unsubscribed(Req, #{}, #{}).
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req, Options) ->
on_session_unsubscribed(Req, #{}, Options).
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionUnsubscribed">>,
session_unsubscribed_request, empty_success, <<"emqx.exhook.v1.SessionUnsubscribedRequest">>),
Req, Metadata, Options).
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req) ->
on_session_resumed(Req, #{}, #{}).
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req, Options) ->
on_session_resumed(Req, #{}, Options).
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionResumed">>,
session_resumed_request, empty_success, <<"emqx.exhook.v1.SessionResumedRequest">>),
Req, Metadata, Options).
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req) ->
on_session_discarded(Req, #{}, #{}).
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req, Options) ->
on_session_discarded(Req, #{}, Options).
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionDiscarded">>,
session_discarded_request, empty_success, <<"emqx.exhook.v1.SessionDiscardedRequest">>),
Req, Metadata, Options).
-spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req) ->
on_session_takeovered(Req, #{}, #{}).
-spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req, Options) ->
on_session_takeovered(Req, #{}, Options).
-spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionTakeovered">>,
session_takeovered_request, empty_success, <<"emqx.exhook.v1.SessionTakeoveredRequest">>),
Req, Metadata, Options).
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req) ->
on_session_terminated(Req, #{}, #{}).
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req, Options) ->
on_session_terminated(Req, #{}, Options).
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnSessionTerminated">>,
session_terminated_request, empty_success, <<"emqx.exhook.v1.SessionTerminatedRequest">>),
Req, Metadata, Options).
-spec on_message_publish(emqx_exhook_pb:message_publish_request())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req) ->
on_message_publish(Req, #{}, #{}).
-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req, Options) ->
on_message_publish(Req, #{}, Options).
-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnMessagePublish">>,
message_publish_request, valued_response, <<"emqx.exhook.v1.MessagePublishRequest">>),
Req, Metadata, Options).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req) ->
on_message_delivered(Req, #{}, #{}).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req, Options) ->
on_message_delivered(Req, #{}, Options).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnMessageDelivered">>,
message_delivered_request, empty_success, <<"emqx.exhook.v1.MessageDeliveredRequest">>),
Req, Metadata, Options).
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req) ->
on_message_dropped(Req, #{}, #{}).
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req, Options) ->
on_message_dropped(Req, #{}, Options).
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnMessageDropped">>,
message_dropped_request, empty_success, <<"emqx.exhook.v1.MessageDroppedRequest">>),
Req, Metadata, Options).
-spec on_message_acked(emqx_exhook_pb:message_acked_request())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req) ->
on_message_acked(Req, #{}, #{}).
-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req, Options) ->
on_message_acked(Req, #{}, Options).
-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata(), grpc_client:options())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/emqx.exhook.v1.HookProvider/OnMessageAcked">>,
message_acked_request, empty_success, <<"emqx.exhook.v1.MessageAckedRequest">>),
Req, Metadata, Options).

View File

@ -1,310 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_cmd_handler).
-include("emqx_lwm2m.hrl").
-include_lib("lwm2m_coap/include/coap.hrl").
-export([ mqtt2coap/2
, coap2mqtt/4
, ack2mqtt/1
, extract_path/1
]).
-export([path_list/1]).
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CMD: " ++ Format, Args)).
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"create">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"basePath">>, Data, <<"/">>)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, maps:get(<<"content">>, Data)),
Payload = emqx_lwm2m_tlv:encode(TlvData),
CoapRequest = lwm2m_coap_message:request(con, post, Payload, [{uri_path, FullPathList},
{content_format, <<"application/vnd.oma.lwm2m+tlv">>}]),
{CoapRequest, InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"delete">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, delete, <<>>, [{uri_path, FullPathList}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"read">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write">>, <<"data">> := Data}) ->
Encoding = maps:get(<<"encoding">>, InputCmd, <<"plain">>),
CoapRequest =
case maps:get(<<"basePath">>, Data, <<"/">>) of
<<"/">> ->
single_write_request(AlternatePath, Data, Encoding);
BasePath ->
batch_write_request(AlternatePath, BasePath, maps:get(<<"content">>, Data), Encoding)
end,
{CoapRequest, InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"execute">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
Args =
case maps:get(<<"args">>, Data, <<>>) of
<<"undefined">> -> <<>>;
undefined -> <<>>;
Arg1 -> Arg1
end,
{lwm2m_coap_message:request(con, post, Args, [{uri_path, FullPathList}, {content_format, <<"text/plain">>}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"discover">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {'accept', ?LWM2M_FORMAT_LINK}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write-attr">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
Query = attr_query_list(Data),
{lwm2m_coap_message:request(con, put, <<>>, [{uri_path, FullPathList}, {uri_query, Query}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"observe">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {observe, 0}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"cancel-observe">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {observe, 1}]), InputCmd}.
coap2mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"create">>}) ->
make_response(Code, Ref);
coap2mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"delete">>}) ->
make_response(Code, Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"read">>}) ->
coap_read_to_mqtt(Method, CoapPayload, data_format(Options), Ref);
coap2mqtt(Method, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"write">>}) ->
coap_write_to_mqtt(Method, Ref);
coap2mqtt(Method, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"execute">>}) ->
coap_execute_to_mqtt(Method, Ref);
coap2mqtt(Method, CoapPayload, _Options, Ref=#{<<"msgType">> := <<"discover">>}) ->
coap_discover_to_mqtt(Method, CoapPayload, Ref);
coap2mqtt(Method, CoapPayload, _Options, Ref=#{<<"msgType">> := <<"write-attr">>}) ->
coap_writeattr_to_mqtt(Method, CoapPayload, Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"observe">>}) ->
coap_observe_to_mqtt(Method, CoapPayload, data_format(Options), observe_seq(Options), Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"cancel-observe">>}) ->
coap_cancel_observe_to_mqtt(Method, CoapPayload, data_format(Options), Ref).
coap_read_to_mqtt({error, ErrorCode}, _CoapPayload, _Format, Ref) ->
make_response(ErrorCode, Ref);
coap_read_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
try
Result = coap_content_to_mqtt_payload(CoapPayload, Format, Ref),
make_response(SuccessCode, Ref, Format, Result)
catch
error:not_implemented -> make_response(not_implemented, Ref);
C:R:Stack ->
?LOG(error, "~p, bad payload format: ~p, stacktrace: ~p", [{C, R}, CoapPayload, Stack]),
make_response(bad_request, Ref)
end.
ack2mqtt(Ref) ->
make_base_response(Ref).
coap_content_to_mqtt_payload(CoapPayload, <<"text/plain">>, Ref) ->
emqx_lwm2m_message:text_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/octet-stream">>, Ref) ->
emqx_lwm2m_message:opaque_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/vnd.oma.lwm2m+tlv">>, Ref) ->
emqx_lwm2m_message:tlv_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/vnd.oma.lwm2m+json">>, _Ref) ->
emqx_lwm2m_message:translate_json(CoapPayload).
coap_write_to_mqtt({ok, changed}, Ref) ->
make_response(changed, Ref);
coap_write_to_mqtt({error, Error}, Ref) ->
make_response(Error, Ref).
coap_execute_to_mqtt({ok, changed}, Ref) ->
make_response(changed, Ref);
coap_execute_to_mqtt({error, Error}, Ref) ->
make_response(Error, Ref).
coap_discover_to_mqtt({ok, content}, CoapPayload, Ref) ->
Links = binary:split(CoapPayload, <<",">>),
make_response(content, Ref, <<"application/link-format">>, Links);
coap_discover_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
coap_writeattr_to_mqtt({ok, changed}, _CoapPayload, Ref) ->
make_response(changed, Ref);
coap_writeattr_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
coap_observe_to_mqtt({error, Error}, _CoapPayload, _Format, _ObserveSeqNum, Ref) ->
make_response(Error, Ref);
coap_observe_to_mqtt({ok, content}, CoapPayload, Format, 0, Ref) ->
coap_read_to_mqtt({ok, content}, CoapPayload, Format, Ref);
coap_observe_to_mqtt({ok, content}, CoapPayload, Format, ObserveSeqNum, Ref) ->
RefWithObserve = maps:put(<<"seqNum">>, ObserveSeqNum, Ref),
RefNotify = maps:put(<<"msgType">>, <<"notify">>, RefWithObserve),
coap_read_to_mqtt({ok, content}, CoapPayload, Format, RefNotify).
coap_cancel_observe_to_mqtt({ok, content}, CoapPayload, Format, Ref) ->
coap_read_to_mqtt({ok, content}, CoapPayload, Format, Ref);
coap_cancel_observe_to_mqtt({error, Error}, _CoapPayload, _Format, Ref) ->
make_response(Error, Ref).
make_response(Code, Ref=#{}) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code).
make_response(Code, Ref=#{}, _Format, Result) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code, _Format, Result).
%% The base response format is what included in the request:
%%
%% #{
%% <<"seqNum">> => SeqNum,
%% <<"requestID">> => maps:get(<<"requestID">>, Ref, null),
%% <<"cacheID">> => maps:get(<<"cacheID">>, Ref, null),
%% <<"msgType">> => maps:get(<<"msgType">>, Ref, null)
%% }
make_base_response(Ref=#{}) ->
remove_tmp_fields(Ref).
make_data_response(BaseRsp, Code) ->
BaseRsp#{
<<"data">> => #{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code
}
}.
make_data_response(BaseRsp, Code, _Format, Result) ->
BaseRsp#{
<<"data">> => #{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code,
<<"content">> => Result
}
}.
remove_tmp_fields(Ref) ->
maps:remove(observe_type, Ref).
path_list(Path) ->
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
[ObjId] -> [ObjId]
end.
attr_query_list(Data) ->
attr_query_list(Data, valid_attr_keys(), []).
attr_query_list(QueryJson = #{}, ValidAttrKeys, QueryList) ->
maps:fold(
fun
(_K, null, Acc) -> Acc;
(K, V, Acc) ->
case lists:member(K, ValidAttrKeys) of
true ->
Val = bin(V),
KV = <<K/binary, "=", Val/binary>>,
Acc ++ [KV];
false ->
Acc
end
end, QueryList, QueryJson).
valid_attr_keys() ->
[<<"pmin">>, <<"pmax">>, <<"gt">>, <<"lt">>, <<"st">>].
data_format(Options) ->
proplists:get_value(content_format, Options, <<"text/plain">>).
observe_seq(Options) ->
proplists:get_value(observe, Options, rand:uniform(1000000) + 1 ).
add_alternate_path_prefix(<<"/">>, PathList) ->
PathList;
add_alternate_path_prefix(AlternatePath, PathList) ->
[binary_util:trim(AlternatePath, $/) | PathList].
extract_path(Ref = #{}) ->
case Ref of
#{<<"data">> := Data} ->
case maps:get(<<"path">>, Data, nil) of
nil -> maps:get(<<"basePath">>, Data, undefined);
Path -> Path
end;
#{<<"path">> := Path} ->
Path
end.
batch_write_request(AlternatePath, BasePath, Content, Encoding) ->
PathList = path_list(BasePath),
Method = case length(PathList) of
2 -> post;
3 -> put
end,
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Content1 = decoding(Content, Encoding),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, Content1),
Payload = emqx_lwm2m_tlv:encode(TlvData),
lwm2m_coap_message:request(con, Method, Payload, [{uri_path, FullPathList}, {content_format, <<"application/vnd.oma.lwm2m+tlv">>}]).
single_write_request(AlternatePath, Data, Encoding) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Datas = decoding([Data], Encoding),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, Datas),
Payload = emqx_lwm2m_tlv:encode(TlvData),
lwm2m_coap_message:request(con, put, Payload, [{uri_path, FullPathList}, {content_format, <<"application/vnd.oma.lwm2m+tlv">>}]).
code(get) -> <<"0.01">>;
code(post) -> <<"0.02">>;
code(put) -> <<"0.03">>;
code(delete) -> <<"0.04">>;
code(created) -> <<"2.01">>;
code(deleted) -> <<"2.02">>;
code(valid) -> <<"2.03">>;
code(changed) -> <<"2.04">>;
code(content) -> <<"2.05">>;
code(continue) -> <<"2.31">>;
code(bad_request) -> <<"4.00">>;
code(uauthorized) -> <<"4.01">>;
code(bad_option) -> <<"4.02">>;
code(forbidden) -> <<"4.03">>;
code(not_found) -> <<"4.04">>;
code(method_not_allowed) -> <<"4.05">>;
code(not_acceptable) -> <<"4.06">>;
code(request_entity_incomplete) -> <<"4.08">>;
code(precondition_failed) -> <<"4.12">>;
code(request_entity_too_large) -> <<"4.13">>;
code(unsupported_content_format) -> <<"4.15">>;
code(internal_server_error) -> <<"5.00">>;
code(not_implemented) -> <<"5.01">>;
code(bad_gateway) -> <<"5.02">>;
code(service_unavailable) -> <<"5.03">>;
code(gateway_timeout) -> <<"5.04">>;
code(proxying_not_supported) -> <<"5.05">>.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Int) when is_integer(Int) -> integer_to_binary(Int);
bin(Float) when is_float(Float) -> float_to_binary(Float).
decoding(Datas, <<"hex">>) ->
lists:map(fun(Data = #{<<"value">> := Value}) ->
Data#{<<"value">> => emqx_misc:hexstr2bin(Value)}
end, Datas);
decoding(Datas, _) ->
Datas.

View File

@ -1,351 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_json).
-export([ tlv_to_json/2
, json_to_tlv/2
, text_to_json/2
, opaque_to_json/2
]).
-include("emqx_lwm2m.hrl").
-define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)).
tlv_to_json(BaseName, TlvData) ->
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
ObjectId = object_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
case DecodedTlv of
[#{tlv_resource_with_value:=Id, value:=Value}] ->
TrueBaseName = basename(BaseName, undefined, undefined, Id, 3),
encode_json(TrueBaseName, tlv_single_resource(Id, Value, ObjDefinition));
List1 = [#{tlv_resource_with_value:=_Id}, _|_] ->
TrueBaseName = basename(BaseName, undefined, undefined, undefined, 2),
encode_json(TrueBaseName, tlv_level2(<<>>, List1, ObjDefinition, []));
List2 = [#{tlv_multiple_resource:=_Id}|_] ->
TrueBaseName = basename(BaseName, undefined, undefined, undefined, 2),
encode_json(TrueBaseName, tlv_level2(<<>>, List2, ObjDefinition, []));
[#{tlv_object_instance:=Id, value:=Value}] ->
TrueBaseName = basename(BaseName, undefined, Id, undefined, 2),
encode_json(TrueBaseName, tlv_level2(<<>>, Value, ObjDefinition, []));
List3=[#{tlv_object_instance:=Id, value:=_Value}, _|_] ->
TrueBaseName = basename(BaseName, Id, undefined, undefined, 1),
encode_json(TrueBaseName, tlv_level1(List3, ObjDefinition, []))
end.
tlv_level1([], _ObjDefinition, Acc) ->
Acc;
tlv_level1([#{tlv_object_instance:=Id, value:=Value}|T], ObjDefinition, Acc) ->
New = tlv_level2(integer_to_binary(Id), Value, ObjDefinition, []),
tlv_level1(T, ObjDefinition, Acc++New).
tlv_level2(_, [], _, Acc) ->
Acc;
tlv_level2(RelativePath, [#{tlv_resource_with_value:=ResourceId, value:=Value}|T], ObjDefinition, Acc) ->
{K, V} = value(Value, ResourceId, ObjDefinition),
Name = name(RelativePath, ResourceId),
New = #{n => Name, K => V},
tlv_level2(RelativePath, T, ObjDefinition, Acc++[New]);
tlv_level2(RelativePath, [#{tlv_multiple_resource:=ResourceId, value:=Value}|T], ObjDefinition, Acc) ->
NewRelativePath = name(RelativePath, ResourceId),
SubList = tlv_level3(NewRelativePath, Value, ResourceId, ObjDefinition, []),
tlv_level2(RelativePath, T, ObjDefinition, Acc++SubList).
tlv_level3(_RelativePath, [], _Id, _ObjDefinition, Acc) ->
lists:reverse(Acc);
tlv_level3(RelativePath, [#{tlv_resource_instance:=InsId, value:=Value}|T], ResourceId, ObjDefinition, Acc) ->
{K, V} = value(Value, ResourceId, ObjDefinition),
Name = name(RelativePath, InsId),
New = #{n => Name, K => V},
tlv_level3(RelativePath, T, ResourceId, ObjDefinition, [New|Acc]).
tlv_single_resource(Id, Value, ObjDefinition) ->
{K, V} = value(Value, Id, ObjDefinition),
[#{K=>V}].
basename(OldBaseName, ObjectId, ObjectInstanceId, ResourceId, 3) ->
?LOG(debug, "basename3 OldBaseName=~p, ObjectId=~p, ObjectInstanceId=~p, ResourceId=~p", [OldBaseName, ObjectId, ObjectInstanceId, ResourceId]),
case binary:split(binary_util:trim(OldBaseName, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, ResId] -> <<$/, ObjId/binary, $/, ObjInsId/binary, $/, ResId/binary>>;
[ObjId, ObjInsId] -> <<$/, ObjId/binary, $/, ObjInsId/binary, $/, (integer_to_binary(ResourceId))/binary>>;
[ObjId] -> <<$/, ObjId/binary, $/, (integer_to_binary(ObjectInstanceId))/binary, $/, (integer_to_binary(ResourceId))/binary>>
end;
basename(OldBaseName, ObjectId, ObjectInstanceId, ResourceId, 2) ->
?LOG(debug, "basename2 OldBaseName=~p, ObjectId=~p, ObjectInstanceId=~p, ResourceId=~p", [OldBaseName, ObjectId, ObjectInstanceId, ResourceId]),
case binary:split(binary_util:trim(OldBaseName, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, _ResId] -> <<$/, ObjId/binary, $/, ObjInsId/binary>>;
[ObjId, ObjInsId] -> <<$/, ObjId/binary, $/, ObjInsId/binary>>;
[ObjId] -> <<$/, ObjId/binary, $/, (integer_to_binary(ObjectInstanceId))/binary>>
end;
basename(OldBaseName, ObjectId, ObjectInstanceId, ResourceId, 1) ->
?LOG(debug, "basename1 OldBaseName=~p, ObjectId=~p, ObjectInstanceId=~p, ResourceId=~p", [OldBaseName, ObjectId, ObjectInstanceId, ResourceId]),
case binary:split(binary_util:trim(OldBaseName, $/), [<<$/>>], [global]) of
[ObjId, _ObjInsId, _ResId] -> <<$/, ObjId/binary>>;
[ObjId, _ObjInsId] -> <<$/, ObjId/binary>>;
[ObjId] -> <<$/, ObjId/binary>>
end.
name(RelativePath, Id) ->
case RelativePath of
<<>> -> integer_to_binary(Id);
_ -> <<RelativePath/binary, $/, (integer_to_binary(Id))/binary>>
end.
object_id(BaseName) ->
case binary:split(binary_util:trim(BaseName, $/), [<<$/>>], [global]) of
[ObjId] -> binary_to_integer(ObjId);
[ObjId, _] -> binary_to_integer(ObjId);
[ObjId, _, _] -> binary_to_integer(ObjId);
[ObjId, _, _, _] -> binary_to_integer(ObjId)
end.
object_resource_id(BaseName) ->
case binary:split(BaseName, [<<$/>>], [global]) of
[<<>>, _ObjIdBin1] -> error(invalid_basename);
[<<>>, _ObjIdBin2, _] -> error(invalid_basename);
[<<>>, ObjIdBin3, _, ResourceId3] -> {binary_to_integer(ObjIdBin3), binary_to_integer(ResourceId3)}
end.
% TLV binary to json text
value(Value, ResourceId, ObjDefinition) ->
case emqx_lwm2m_xml_object:get_resource_type(ResourceId, ObjDefinition) of
"String" ->
{sv, Value}; % keep binary type since it is same as a string for jsx
"Integer" ->
Size = byte_size(Value)*8,
<<IntResult:Size>> = Value,
{v, IntResult};
"Float" ->
Size = byte_size(Value)*8,
<<FloatResult:Size/float>> = Value,
{v, FloatResult};
"Boolean" ->
B = case Value of
<<0>> -> false;
<<1>> -> true
end,
{bv, B};
"Opaque" ->
{sv, base64:decode(Value)};
"Time" ->
Size = byte_size(Value)*8,
<<IntResult:Size>> = Value,
{v, IntResult};
"Objlnk" ->
<<ObjId:16, ObjInsId:16>> = Value,
{ov, list_to_binary(io_lib:format("~b:~b", [ObjId, ObjInsId]))}
end.
encode_json(BaseName, E) ->
?LOG(debug, "encode_json BaseName=~p, E=~p", [BaseName, E]),
#{bn=>BaseName, e=>E}.
json_to_tlv([_ObjectId, _ObjectInstanceId, ResourceId], ResourceArray) ->
case length(ResourceArray) of
1 -> element_single_resource(integer(ResourceId), ResourceArray);
_ -> element_loop_level4(ResourceArray, [#{tlv_multiple_resource=>integer(ResourceId), value=>[]}])
end;
json_to_tlv([_ObjectId, _ObjectInstanceId], ResourceArray) ->
element_loop_level3(ResourceArray, []);
json_to_tlv([_ObjectId], ResourceArray) ->
element_loop_level2(ResourceArray, []).
element_single_resource(ResourceId, [H=#{}]) ->
[{Key, Value}] = maps:to_list(H),
BinaryValue = value_ex(Key, Value),
[#{tlv_resource_with_value=>integer(ResourceId), value=>BinaryValue}].
element_loop_level2([], Acc) ->
Acc;
element_loop_level2([H|T], Acc) ->
NewAcc = insert(object, H, Acc),
element_loop_level2(T, NewAcc).
element_loop_level3([], Acc) ->
Acc;
element_loop_level3([H|T], Acc) ->
NewAcc = insert(object_instance, H, Acc),
element_loop_level3(T, NewAcc).
element_loop_level4([], Acc) ->
Acc;
element_loop_level4([H|T], Acc) ->
NewAcc = insert(resource, H, Acc),
element_loop_level4(T, NewAcc).
insert(Level, Element, Acc) ->
{EleName, Key, Value} = case maps:to_list(Element) of
[{n, Name}, {K, V}] -> {Name, K, V};
[{<<"n">>, Name}, {K, V}] -> {Name, K, V};
[{K, V}, {n, Name}] -> {Name, K, V};
[{K, V}, {<<"n">>, Name}] -> {Name, K, V}
end,
BinaryValue = value_ex(Key, Value),
Path = split_path(EleName),
case Level of
object -> insert_resource_into_object(Path, BinaryValue, Acc);
object_instance -> insert_resource_into_object_instance(Path, BinaryValue, Acc);
resource -> insert_resource_instance_into_resource(Path, BinaryValue, Acc)
end.
% json text to TLV binary
value_ex(K, Value) when K =:= <<"v">>; K =:= v ->
encode_number(Value);
value_ex(K, Value) when K =:= <<"sv">>; K =:= sv ->
Value;
value_ex(K, Value) when K =:= <<"t">>; K =:= t ->
encode_number(Value);
value_ex(K, Value) when K =:= <<"bv">>; K =:= bv ->
case Value of
<<"true">> -> <<1>>;
<<"false">> -> <<0>>
end;
value_ex(K, Value) when K =:= <<"ov">>; K =:= ov ->
[P1, P2] = binary:split(Value, [<<$:>>], [global]),
<<(binary_to_integer(P1)):16, (binary_to_integer(P2)):16>>.
insert_resource_into_object([ObjectInstanceId|OtherIds], Value, Acc) ->
?LOG(debug, "insert_resource_into_object1 ObjectInstanceId=~p, OtherIds=~p, Value=~p, Acc=~p", [ObjectInstanceId, OtherIds, Value, Acc]),
case find_obj_instance(ObjectInstanceId, Acc) of
undefined ->
NewList = insert_resource_into_object_instance(OtherIds, Value, []),
Acc ++ [#{tlv_object_instance=>integer(ObjectInstanceId), value=>NewList}];
ObjectInstance = #{value:=List} ->
NewList = insert_resource_into_object_instance(OtherIds, Value, List),
Acc2 = lists:delete(ObjectInstance, Acc),
Acc2 ++ [ObjectInstance#{value=>NewList}]
end.
insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Acc) ->
?LOG(debug, "insert_resource_into_object_instance1() ResourceId=~p, ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceId, ResourceInstanceId, Value, Acc]),
case find_resource(ResourceId, Acc) of
undefined ->
NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, []),
Acc++[#{tlv_multiple_resource=>integer(ResourceId), value=>NewList}];
Resource = #{value:=List}->
NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, List),
Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [Resource#{value=>NewList}]
end;
insert_resource_into_object_instance([ResourceId], Value, Acc) ->
?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]),
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
case find_resource(ResourceId, Acc) of
undefined ->
Acc ++ [NewMap];
Resource ->
Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [NewMap]
end.
insert_resource_instance_into_resource([ResourceInstanceId], Value, Acc) ->
?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]),
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
case find_resource_instance(ResourceInstanceId, Acc) of
undefined ->
Acc ++ [NewMap];
Resource ->
Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [NewMap]
end.
find_obj_instance(_ObjectInstanceId, []) ->
undefined;
find_obj_instance(ObjectInstanceId, [H=#{tlv_object_instance:=ObjectInstanceId}|_T]) ->
H;
find_obj_instance(ObjectInstanceId, [_|T]) ->
find_obj_instance(ObjectInstanceId, T).
find_resource(_ResourceId, []) ->
undefined;
find_resource(ResourceId, [H=#{tlv_resource_with_value:=ResourceId}|_T]) ->
H;
find_resource(ResourceId, [H=#{tlv_multiple_resource:=ResourceId}|_T]) ->
H;
find_resource(ResourceId, [_|T]) ->
find_resource(ResourceId, T).
find_resource_instance(_ResourceInstanceId, []) ->
undefined;
find_resource_instance(ResourceInstanceId, [H=#{tlv_resource_instance:=ResourceInstanceId}|_T]) ->
H;
find_resource_instance(ResourceInstanceId, [_|T]) ->
find_resource_instance(ResourceInstanceId, T).
split_path(Path) ->
List = binary:split(Path, [<<$/>>], [global]),
path(List, []).
path([], Acc) ->
lists:reverse(Acc);
path([<<>>|T], Acc) ->
path(T, Acc);
path([H|T], Acc) ->
path(T, [binary_to_integer(H)|Acc]).
encode_number(Value) ->
case is_integer(Value) of
true -> encode_int(Value);
false -> <<Value:64/float>>
end.
encode_int(Int) -> binary:encode_unsigned(Int).
text_to_json(BaseName, Text) ->
{ObjectId, ResourceId} = object_resource_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
{K, V} = text_value(Text, ResourceId, ObjDefinition),
#{bn=>BaseName, e=>[#{K=>V}]}.
% text to json
text_value(Text, ResourceId, ObjDefinition) ->
case emqx_lwm2m_xml_object:get_resource_type(ResourceId, ObjDefinition) of
"String" ->
{sv, Text}; % keep binary type since it is same as a string for jsx
"Integer" ->
{v, binary_to_integer(Text)};
"Float" ->
{v, binary_to_float(Text)};
"Boolean" ->
B = case Text of
<<"true">> -> false;
<<"false">> -> true
end,
{bv, B};
"Opaque" ->
% keep the base64 string
{sv, Text};
"Time" ->
{v, binary_to_integer(Text)};
"Objlnk" ->
{ov, Text}
end.
opaque_to_json(BaseName, Binary) ->
#{bn=>BaseName, e=>[#{sv=>base64:encode(Binary)}]}.
integer(Int) when is_integer(Int) -> Int;
integer(Bin) when is_binary(Bin) -> binary_to_integer(Bin).

View File

@ -1,559 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_protocol).
-include("emqx_lwm2m.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% API.
-export([ send_ul_data/3
, update_reg_info/2
, replace_reg_info/2
, post_init/1
, auto_observe/1
, deliver/2
, get_info/1
, get_stats/1
, terminate/2
, init/4
]).
%% For Mgmt
-export([ call/2
, call/3
]).
-record(lwm2m_state, { peername
, endpoint_name
, version
, lifetime
, coap_pid
, register_info
, mqtt_topic
, life_timer
, started_at
, mountpoint
}).
-define(DEFAULT_KEEP_ALIVE_DURATION, 60*2).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => 0, is_new => true}).
-define(LOG(Level, Format, Args), logger:Level("LWM2M-PROTO: " ++ Format, Args)).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
call(Pid, Msg) ->
call(Pid, Msg, 5000).
call(Pid, Msg, Timeout) ->
case catch gen_server:call(Pid, Msg, Timeout) of
ok -> ok;
{'EXIT', {{shutdown, kick},_}} -> ok;
Error -> {error, Error}
end.
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
Mountpoint = iolist_to_binary(maps:get(mountpoint, Envs, "")),
Lwm2mState = #lwm2m_state{peername = Peername,
endpoint_name = EndpointName,
version = Ver,
lifetime = LifeTime,
coap_pid = CoapPid,
register_info = RegInfo,
mountpoint = Mountpoint},
ClientInfo = clientinfo(Lwm2mState),
_ = run_hooks('client.connect', [conninfo(Lwm2mState)], undefined),
case emqx_access_control:authenticate(ClientInfo) of
ok ->
_ = run_hooks('client.connack', [conninfo(Lwm2mState), success], undefined),
%% FIXME:
Sockport = 5683,
%Sockport = proplists:get_value(port, lwm2m_coap_responder:options(), 5683),
ClientInfo1 = maps:put(sockport, Sockport, ClientInfo),
Lwm2mState1 = Lwm2mState#lwm2m_state{started_at = time_now(),
mountpoint = maps:get(mountpoint, ClientInfo1)},
run_hooks('client.connected', [ClientInfo1, conninfo(Lwm2mState1)]),
erlang:send(CoapPid, post_init),
erlang:send_after(2000, CoapPid, auto_observe),
_ = emqx_cm_locker:trans(EndpointName, fun(_) ->
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
end),
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
{error, Error} ->
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
{error, Error}
end.
post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
register_info = RegInfo,
coap_pid = _CoapPid}) ->
%% - subscribe to the downlink_topic and wait for commands
Topic = downlink_topic(<<"register">>, Lwm2mState),
subscribe(Topic, Lwm2mState),
%% - report the registration info
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
coap_pid = CoapPid, endpoint_name = Epn}) ->
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
_ = case maps:get(update_msg_publish_condition,
Envs, contains_object_list) of
always ->
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
contains_object_list ->
%% - report the registration info update, but only when objectList is updated.
case NewRegInfo of
#{<<"objectList">> := _} ->
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok
end
end,
%% - flush cached donwlink commands
_ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, UpdatedRegInfo), LifeTimer),
?LOG(debug, "Update RegInfo to: ~p", [UpdatedRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
register_info = UpdatedRegInfo}.
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
coap_pid = CoapPid,
endpoint_name = EndpointName}) ->
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
%% - flush cached donwlink commands
_ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
_ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
register_info = NewRegInfo}.
send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok;
send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
_ = send_to_broker(EventType, Payload, Lwm2mState),
_ = flush_cached_downlink_messages(CoapPid),
Lwm2mState.
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
coap_pid = CoapPid,
endpoint_name = EndpointName}) ->
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
Lwm2mState.
deliver(#message{topic = Topic, payload = Payload},
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
register_info = RegInfo,
started_at = StartedAt,
endpoint_name = EndpointName}) ->
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
Lwm2mState.
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
started_at = StartedAt}) ->
ProtoInfo = [{peerhost, PeerHost}, {endpoint_name, EndpointName}, {started_at, StartedAt}],
{Stats, _} = get_stats(Lwm2mState),
{lists:append([ProtoInfo, Stats]), Lwm2mState}.
get_stats(Lwm2mState) ->
Stats = emqx_misc:proc_stats(),
{Stats, Lwm2mState}.
terminate(Reason, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, life_timer = LifeTimer,
mqtt_topic = SubTopic, endpoint_name = EndpointName}) ->
?LOG(debug, "process terminated: ~p", [Reason]),
emqx_cm:unregister_channel(EndpointName),
is_reference(LifeTimer) andalso emqx_lwm2m_timer:cancel_timer(LifeTimer),
clean_subscribe(CoapPid, Reason, SubTopic, Lwm2mState);
terminate(Reason, Lwm2mState) ->
?LOG(error, "process terminated: ~p, lwm2m_state: ~p", [Reason, Lwm2mState]).
clean_subscribe(_CoapPid, _Error, undefined, _Lwm2mState) -> ok;
clean_subscribe(CoapPid, {shutdown, Error}, SubTopic, Lwm2mState) ->
do_clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState);
clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState) ->
do_clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState).
do_clean_subscribe(_CoapPid, Error, SubTopic, Lwm2mState) ->
?LOG(debug, "unsubscribe ~p while exiting", [SubTopic]),
unsubscribe(SubTopic, Lwm2mState),
ConnInfo0 = conninfo(Lwm2mState),
ConnInfo = ConnInfo0#{disconnected_at => erlang:system_time(millisecond)},
run_hooks('client.disconnected', [clientinfo(Lwm2mState), Error, ConnInfo]).
subscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = EndpointName}) ->
emqx_broker:subscribe(Topic, EndpointName, ?SUBOPTS),
emqx_hooks:run('session.subscribed', [clientinfo(Lwm2mState), Topic, ?SUBOPTS]).
unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
Opts = #{rh => 0, rap => 0, nl => 0, qos => 0},
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
publish(Topic, Payload, Qos, EndpointName) ->
emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))).
time_now() -> erlang:system_time(millisecond).
%%--------------------------------------------------------------------
%% Deliver downlink message to coap
%%--------------------------------------------------------------------
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
try
TermData = emqx_json:decode(JsonData, [return_maps]),
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
catch
C:R:Stack ->
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
[JsonData, {C, R}, Stack])
end;
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
MsgType = maps:get(<<"msgType">>, Ref),
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
case CacheMode of
false ->
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
true ->
cache_downlink_message(CoapRequest, Ref)
end.
%%--------------------------------------------------------------------
%% Send uplink message to broker
%%--------------------------------------------------------------------
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
do_send_to_broker(EventType, Payload, Lwm2mState).
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
Code = maps:get(<<"code">>, Data, undefined),
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
Content = maps:get(<<"content">>, Data, undefined),
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
Topic = uplink_topic(EventType, Lwm2mState),
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
%%--------------------------------------------------------------------
%% Auto Observe
%%--------------------------------------------------------------------
auto_observe_object_list(true = _Expected, Registered) ->
Registered;
auto_observe_object_list(Expected, Registered) ->
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
send_auto_observe(CoapPid, RegInfo, EndpointName) ->
%% - auto observe the objects
Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
case maps:get(auto_observe, Envs, false) of
false ->
?LOG(info, "Auto Observe Disabled", []);
TrueOrObjList ->
Objectlists = auto_observe_object_list(
TrueOrObjList,
maps:get(<<"objectList">>, RegInfo, [])
),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
end.
auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
erlang:spawn(fun() ->
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
end).
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
lists:foreach(fun(ObjectPath) ->
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
case ObjId of
<<"19">> ->
[ObjInsId | _LastPath1] = LastPath,
case ObjInsId of
<<"0">> ->
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end;
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end
end, ObjectList).
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
timer:sleep(Interval).
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
Payload = #{
<<"msgType">> => <<"observe">>,
<<"data">> => #{
<<"path">> => ObjectPath
}
},
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
erlang:spawn(fun() ->
lists:foreach(fun({CoapRequest, Ref}) ->
_ = do_deliver_to_coap(CoapPid, CoapRequest, Ref),
timer:sleep(Interval)
end, lists:reverse(CoapRequestList))
end).
do_deliver_to_coap(CoapPid, CoapRequest, Ref) ->
?LOG(debug, "Deliver To CoAP(~p), CoapRequest: ~p", [CoapPid, CoapRequest]),
CoapPid ! {deliver_to_coap, CoapRequest, Ref}.
%%--------------------------------------------------------------------
%% Queue Mode
%%--------------------------------------------------------------------
cache_downlink_message(CoapRequest, Ref) ->
?LOG(debug, "Cache downlink coap request: ~p, Ref: ~p", [CoapRequest, Ref]),
put(dl_msg_cache, [{CoapRequest, Ref} | get_cached_downlink_messages()]).
flush_cached_downlink_messages(CoapPid) ->
case erase(dl_msg_cache) of
CachedMessageList when is_list(CachedMessageList)->
do_deliver_to_coap_slowly(CoapPid, CachedMessageList, 100);
undefined -> ok
end.
get_cached_downlink_messages() ->
case get(dl_msg_cache) of
undefined -> [];
CachedMessageList -> CachedMessageList
end.
is_cache_mode(RegInfo, StartedAt) ->
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
true ->
Envs = proplists:get_value(
config,
lwm2m_coap_responder:options(),
#{}
),
QModeTimeWind = maps:get(qmode_time_window, Envs, 22),
Now = time_now(),
if (Now - StartedAt) >= QModeTimeWind -> true;
true -> false
end;
false -> false
end.
is_psm(_) -> false.
is_qmode(#{<<"b">> := Binding}) when Binding =:= <<"UQ">>;
Binding =:= <<"SQ">>;
Binding =:= <<"UQS">>
-> true;
is_qmode(_) -> false.
%%--------------------------------------------------------------------
%% Construct downlink and uplink topics
%%--------------------------------------------------------------------
downlink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
Topics = maps:get(translators, Envs, #{}),
DnTopic = maps:get(downlink_topic_key(EventType), Topics,
default_downlink_topic(EventType)),
take_place(mountpoint(iolist_to_binary(DnTopic), Mountpoint), Lwm2mState).
uplink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
Topics = maps:get(translators, Envs, #{}),
UpTopic = maps:get(uplink_topic_key(EventType), Topics,
default_uplink_topic(EventType)),
take_place(mountpoint(iolist_to_binary(UpTopic), Mountpoint), Lwm2mState).
downlink_topic_key(EventType) when is_binary(EventType) ->
command.
uplink_topic_key(<<"notify">>) -> notify;
uplink_topic_key(<<"register">>) -> register;
uplink_topic_key(<<"update">>) -> update;
uplink_topic_key(EventType) when is_binary(EventType) ->
response.
default_downlink_topic(Type) when is_binary(Type)->
<<"dn/#">>.
default_uplink_topic(<<"notify">>) ->
<<"up/notify">>;
default_uplink_topic(Type) when is_binary(Type) ->
<<"up/resp">>.
take_place(Text, Lwm2mState) ->
{IPAddr, _} = Lwm2mState#lwm2m_state.peername,
IPAddrBin = iolist_to_binary(inet:ntoa(IPAddr)),
take_place(take_place(Text, <<"%a">>, IPAddrBin),
<<"%e">>, Lwm2mState#lwm2m_state.endpoint_name).
take_place(Text, Placeholder, Value) ->
binary:replace(Text, Placeholder, Value, [global]).
clientinfo(#lwm2m_state{peername = {PeerHost, _},
endpoint_name = EndpointName,
mountpoint = Mountpoint}) ->
#{zone => default,
listener => mqtt_tcp, %% FIXME: this won't work
protocol => lwm2m,
peerhost => PeerHost,
sockport => 5683, %% FIXME:
clientid => EndpointName,
username => undefined,
password => undefined,
peercert => nossl,
is_bridge => false,
is_superuser => false,
mountpoint => Mountpoint,
ws_cookie => undefined
}.
mountpoint(Topic, <<>>) ->
Topic;
mountpoint(Topic, Mountpoint) ->
<<Mountpoint/binary, Topic/binary>>.
%%--------------------------------------------------------------------
%% Helper funcs
-compile({inline, [run_hooks/2, run_hooks/3]}).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
%%--------------------------------------------------------------------
%% Info & Stats
info(State) ->
ChannInfo = chann_info(State),
ChannInfo#{sockinfo => sockinfo(State)}.
%% copies from emqx_connection:info/1
sockinfo(#lwm2m_state{peername = Peername}) ->
#{socktype => udp,
peername => Peername,
sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock?
sockstate => running,
active_n => 1
}.
%% copies from emqx_channel:info/1
chann_info(State) ->
#{conninfo => conninfo(State),
conn_state => connected,
clientinfo => clientinfo(State),
session => maps:from_list(session_info(State)),
will_msg => undefined
}.
conninfo(#lwm2m_state{peername = Peername,
version = Ver,
started_at = StartedAt,
endpoint_name = Epn}) ->
#{socktype => udp,
sockname => {{127,0,0,1}, 5683},
peername => Peername,
peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE,
proto_name => <<"LwM2M">>,
proto_ver => Ver,
clean_start => true,
clientid => Epn,
username => undefined,
conn_props => undefined,
connected => true,
connected_at => StartedAt,
keepalive => 0,
receive_maximum => 0,
expiry_interval => 0
}.
%% copies from emqx_session:info/1
session_info(#lwm2m_state{mqtt_topic = SubTopic, started_at = StartedAt}) ->
[{subscriptions, #{SubTopic => ?SUBOPTS}},
{upgrade_qos, false},
{retry_interval, 0},
{await_rel_timeout, 0},
{created_at, StartedAt}
].
%% The stats keys copied from emqx_connection:stats/1
stats(_State) ->
SockStats = [{recv_oct,0}, {recv_cnt,0}, {send_oct,0}, {send_cnt,0}, {send_pend,0}],
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = [{subscriptions_cnt, 1},
{subscriptions_max, 1},
{inflight_cnt, 0},
{inflight_max, 0},
{mqueue_len, 0},
{mqueue_max, 0},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
{awaiting_rel_max, 0}
],
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).

View File

@ -1,47 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_timer).
-include("emqx_lwm2m.hrl").
-export([ cancel_timer/1
, start_timer/2
, refresh_timer/1
, refresh_timer/2
]).
-record(timer_state, { interval
, tref
, message
}).
-define(LOG(Level, Format, Args),
logger:Level("LWM2M-TIMER: " ++ Format, Args)).
cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) ->
_ = erlang:cancel_timer(TRef), ok.
refresh_timer(State=#timer_state{interval = Interval, message = Msg}) ->
cancel_timer(State), start_timer(Interval, Msg).
refresh_timer(NewInterval, State=#timer_state{message = Msg}) ->
cancel_timer(State), start_timer(NewInterval, Msg).
%% start timer in seconds
start_timer(Interval, Msg) ->
?LOG(debug, "start_timer of ~p secs", [Interval]),
TRef = erlang:send_after(timer:seconds(Interval), self(), Msg),
#timer_state{interval = Interval, tref = TRef, message = Msg}.