diff --git a/apps/emqx_resource/LICENSE b/apps/emqx_resource/LICENSE new file mode 100644 index 000000000..2f97fdd03 --- /dev/null +++ b/apps/emqx_resource/LICENSE @@ -0,0 +1,191 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2021, Shawn <506895667@qq.com>. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/apps/emqx_resource/Makefile b/apps/emqx_resource/Makefile new file mode 100644 index 000000000..596b9b2a1 --- /dev/null +++ b/apps/emqx_resource/Makefile @@ -0,0 +1,43 @@ +REBAR := rebar3 + +.PHONY: all +all: es + +.PHONY: compile +compile: + $(REBAR) compile + +.PHONY: clean +clean: distclean + +.PHONY: distclean +distclean: + @rm -rf _build erl_crash.dump rebar3.crashdump + +.PHONY: xref +xref: + $(REBAR) xref + +.PHONY: eunit +eunit: compile + $(REBAR) eunit -v -c + $(REBAR) cover + +.PHONY: ct +ct: compile + $(REBAR) as test ct -v + +cover: + $(REBAR) cover + +.PHONY: dialyzer +dialyzer: + $(REBAR) dialyzer + +.PHONY: es +es: compile + $(REBAR) escriptize + +.PHONY: elvis +elvis: + ./scripts/elvis-check.sh diff --git a/apps/emqx_resource/README.md b/apps/emqx_resource/README.md new file mode 100644 index 000000000..9e91b633c --- /dev/null +++ b/apps/emqx_resource/README.md @@ -0,0 +1,53 @@ +# emqx_resource + +The `emqx_resource` is an application that manages configuration specs and runtime states +for components that need to be configured and manipulated from the emqx-dashboard. + +It is intended to be used by resources, actions, acl, auth, backend_logics and more. + +It reads the configuration spec from *.spec (in HOCON format) and provide APIs for +creating, updating and destroying resource instances among all nodes in the cluster. + +It handles the problem like storing the configs and runtime states for both resource +and resource instances, and how porting them between different emqx_resource versions. + +It may maintain the config and data in JSON or HOCON files in data/ dir. + +After restarting the emqx_resource, it re-creates all the resource instances. + +There can be foreign references between resource instances via resource-id. +So they may find each other via this Id. + +## Try it out + + $ ./demo.sh + Eshell V11.1.8 (abort with ^G) + 1> == the demo log tracer <<"log_tracer_clientid_shawn">> started. + config: #{<<"config">> => + #{<<"bulk">> => <<"10KB">>,<<"cache_log_dir">> => <<"/tmp">>, + <<"condition">> => #{<<"clientid">> => <<"abc">>}, + <<"level">> => <<"debug">>}, + <<"id">> => <<"log_tracer_clientid_shawn">>, + <<"resource_type">> => <<"log_tracer">>} + 1> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>). + == the demo log tracer <<"log_tracer_clientid_shawn">> is working well + state: #{health_checked => 1,logger_handler_id => abc} + ok + + 2> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>). + == the demo log tracer <<"log_tracer_clientid_shawn">> is working well + state: #{health_checked => 2,logger_handler_id => abc} + ok + + 3> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log). + == the demo log tracer <<"log_tracer_clientid_shawn">> received request: get_log + state: #{health_checked => 2,logger_handler_id => abc} + "this is a demo log messages..." + + 4> emqx_resource_instance:remove(<<"log_tracer_clientid_shawn">>). + == the demo log tracer <<"log_tracer_clientid_shawn">> stopped. + state: #{health_checked => 0,logger_handler_id => abc} + ok + + 5> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log). + ** exception error: {get_instance,{<<"log_tracer_clientid_shawn">>,not_found}} diff --git a/apps/emqx_resource/demo.sh b/apps/emqx_resource/demo.sh new file mode 100755 index 000000000..19cbab809 --- /dev/null +++ b/apps/emqx_resource/demo.sh @@ -0,0 +1,6 @@ +#!/bin/sh +set -e + +rebar3 compile + +erl -sname abc -pa _build/default/lib/*/ebin _build/default/lib/emqx_resource/examples -s demo diff --git a/apps/emqx_resource/elvis.config b/apps/emqx_resource/elvis.config new file mode 100644 index 000000000..59aa13fbe --- /dev/null +++ b/apps/emqx_resource/elvis.config @@ -0,0 +1,14 @@ +[{elvis, [{config, [ + +#{dirs => ["src"], + filter => "*.erl", + %ignore => [], + ruleset => erl_files, + rules => [{elvis_style, operator_spaces, #{ + rules => [{right, ","}, + {right, "|"}, + {left, "|"}, + {right, "||"}, + {left, "||"}]}}, + {elvis_style, god_modules, #{limit => 100}}]} +]}]}]. diff --git a/apps/emqx_resource/examples/demo.erl b/apps/emqx_resource/examples/demo.erl new file mode 100644 index 000000000..171a80b61 --- /dev/null +++ b/apps/emqx_resource/examples/demo.erl @@ -0,0 +1,13 @@ +-module(demo). + +-export([start/0]). + +start() -> + code:load_file(log_tracer), + code:load_file(log_tracer_schema), + {ok, _} = application:ensure_all_started(minirest), + {ok, _} = application:ensure_all_started(emqx_resource), + emqx_resource:load_instances("./_build/default/lib/emqx_resource/examples"), + Handlers = [{"/", minirest:handler(#{modules => [log_tracer]})}], + Dispatch = [{"/[...]", minirest, Handlers}], + minirest:start_http(?MODULE, #{socket_opts => [inet, {port, 9900}]}, Dispatch). diff --git a/apps/emqx_resource/examples/demo.md b/apps/emqx_resource/examples/demo.md new file mode 100644 index 000000000..d7ff7f059 --- /dev/null +++ b/apps/emqx_resource/examples/demo.md @@ -0,0 +1,147 @@ +--- +theme: gaia +color: #000 +colorSecondary: #333 +backgroundColor: #fff +backgroundImage: url('https://marp.app/assets/hero-background.jpg') +paginate: true +marp: true +--- + + + +# EMQX Resource + +--- + +## What is it for + +The [emqx_resource](https://github.com/terry-xiaoyu/emqx_resource) for managing configurations and runtime states for dashboard components . + +![bg right](https://docs.emqx.cn/assets/img/rule_action_1@2x.73766093.png) + +--- + + + +# The Demo + +The little log tracer + +--- + +- The hocon schema file (log_tracer_schema.erl): + +https://github.com/terry-xiaoyu/emqx_resource/blob/main/examples/log_tracer_schema.erl + +- The callback file (log_tracer.erl): + +https://github.com/terry-xiaoyu/emqx_resource/blob/main/examples/log_tracer.erl + +--- + +Start the demo log tracer + +``` +./demo.sh +``` + +Load instance from config files (auto loaded) + +``` +## This will load all of the "*.conf" file under that directory: + +emqx_resource:load_instances("./_build/default/lib/emqx_resource/examples"). +``` + +The config file is validated against the schema (`*_schema.erl`) before loaded. + +--- + +# List Types and Instances + +- To list all the available resource types: + +``` +emqx_resource:list_types(). +emqx_resource:list_instances(). +``` + +- And there's `*_verbose` versions for these `list_*` APIs: + +``` +emqx_resource:list_types_verbose(). +emqx_resource:list_instances_verbose(). +``` + +--- +# Instance management + +- To get a resource types and instances: + +``` +emqx_resource:get_type(log_tracer). +emqx_resource:get_instance("log_tracer_clientid_shawn"). +``` + +- To create a resource instances: + +``` +emqx_resource:create("log_tracer2", log_tracer, +#{bulk => <<"1KB">>,cache_log_dir => <<"/tmp">>, + cache_logs_in => <<"memory">>,chars_limit => 1024, + condition => #{<<"app">> => <<"emqx">>}, + enable_cache => true,level => debug}). +``` + +--- + +- To update a resource: + +``` +emqx_resource:update("log_tracer2", log_tracer, #{bulk => <<"100KB">>}, []). +``` + +- To delete a resource: + +``` +emqx_resource:remove("log_tracer2"). +``` + +--- + + + +# HTTP APIs Demo + +--- + +# Get a log tracer + +To list current log tracers: + +``` +curl -s -XGET 'http://localhost:9900/log_tracer' | jq . +``` + +--- + +## Update or Create + +To update an existing log tracer or create a new one: + +``` +INST='{ + "resource_type": "log_tracer", + "config": { + "condition": { + "app": "emqx" + }, + "level": "debug", + "cache_log_dir": "/tmp", + "bulk": "10KB", + "chars_limit": 1024 + } +}' +curl -sv -XPUT 'http://localhost:9900/log_tracer/log_tracer2' -d $INST | jq . +``` diff --git a/apps/emqx_resource/examples/log_tracer.conf b/apps/emqx_resource/examples/log_tracer.conf new file mode 100644 index 000000000..7b438ec1f --- /dev/null +++ b/apps/emqx_resource/examples/log_tracer.conf @@ -0,0 +1,11 @@ +{ + "id": "log_tracer_clientid_shawn" + "resource_type": "log_tracer" + "config": { + "condition": {"app": "emqx"} + "level": "debug" + "cache_log_dir": "/tmp" + "bulk": "10KB" + "chars_limit": 1024 + } +} \ No newline at end of file diff --git a/apps/emqx_resource/examples/log_tracer.erl b/apps/emqx_resource/examples/log_tracer.erl new file mode 100644 index 000000000..99fc5bd3b --- /dev/null +++ b/apps/emqx_resource/examples/log_tracer.erl @@ -0,0 +1,45 @@ +-module(log_tracer). + +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-emqx_resource_api_path("/log_tracer"). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + , on_api_reply_format/1 + , on_config_merge/3 + ]). + +%% callbacks for emqx_resource config schema +-export([fields/1]). + +fields(ConfPath) -> + log_tracer_schema:fields(ConfPath). + +on_start(InstId, Config) -> + io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]), + {ok, #{logger_handler_id => abc, health_checked => 0}}. + +on_stop(InstId, State) -> + io:format("== the demo log tracer ~p stopped.~nstate: ~p~n", [InstId, State]), + ok. + +on_query(InstId, Request, AfterQuery, State) -> + io:format("== the demo log tracer ~p received request: ~p~nstate: ~p~n", + [InstId, Request, State]), + emqx_resource:query_success(AfterQuery), + "this is a demo log messages...". + +on_health_check(InstId, State = #{health_checked := Checked}) -> + NState = State#{health_checked => Checked + 1}, + io:format("== the demo log tracer ~p is working well~nstate: ~p~n", [InstId, NState]), + {ok, NState}. + +on_api_reply_format(#{id := Id, status := Status, state := #{health_checked := NChecked}}) -> + #{id => Id, status => Status, checked_count => NChecked}. + +on_config_merge(OldConfig, NewConfig, _Params) -> + maps:merge(OldConfig, NewConfig). diff --git a/apps/emqx_resource/examples/log_tracer_schema.erl b/apps/emqx_resource/examples/log_tracer_schema.erl new file mode 100644 index 000000000..2b49aab7f --- /dev/null +++ b/apps/emqx_resource/examples/log_tracer_schema.erl @@ -0,0 +1,45 @@ +-module(log_tracer_schema). + +-include_lib("typerefl/include/types.hrl"). + +-export([fields/1]). + +-reflect_type([t_level/0, t_cache_logs_in/0]). + +-type t_level() :: debug | info | notice | warning | error | critical | alert | emergency. + +-type t_cache_logs_in() :: memory | file. + +fields("config") -> + [ {condition, fun condition/1} + , {level, fun level/1} + , {enable_cache, fun enable_cache/1} + , {cache_logs_in, fun cache_logs_in/1} + , {cache_log_dir, fun cache_log_dir/1} + , {bulk, fun bulk/1} + ]; +fields(_) -> []. + +condition(mapping) -> "config.condition"; +condition(type) -> map(); +condition(_) -> undefined. + +level(mapping) -> "config.level"; +level(type) -> t_level(); +level(_) -> undefined. + +enable_cache(mapping) -> "config.enable_cache"; +enable_cache(type) -> boolean(); +enable_cache(_) -> undefined. + +cache_logs_in(mapping) -> "config.cache_logs_in"; +cache_logs_in(type) -> t_cache_logs_in(); +cache_logs_in(_) -> undefined. + +cache_log_dir(mapping) -> "config.cache_log_dir"; +cache_log_dir(type) -> typerefl:regexp_string("^(.*)$"); +cache_log_dir(_) -> undefined. + +bulk(mapping) -> "config.bulk"; +bulk(type) -> typerefl:regexp_string("^[. 0-9]+(B|KB|MB|GB)$"); +bulk(_) -> undefined. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl new file mode 100644 index 000000000..429b83c45 --- /dev/null +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -0,0 +1,19 @@ +-type resource_type() :: module(). +-type instance_id() :: binary(). +-type resource_config() :: jsx:json_term(). +-type resource_spec() :: map(). +-type resource_state() :: term(). +-type resource_data() :: #{ + id => instance_id(), + mod => module(), + config => resource_config(), + state => resource_state(), + status => started | stopped +}. + +-type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} | + undefined. + +%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback +%% actions upon query failure +-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. diff --git a/apps/emqx_resource/include/emqx_resource_behaviour.hrl b/apps/emqx_resource/include/emqx_resource_behaviour.hrl new file mode 100644 index 000000000..bf0f1cd2e --- /dev/null +++ b/apps/emqx_resource/include/emqx_resource_behaviour.hrl @@ -0,0 +1,3 @@ +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-behaviour(emqx_resource). +-compile({parse_transform, emqx_resource_transform}). diff --git a/apps/emqx_resource/include/emqx_resource_utils.hrl b/apps/emqx_resource/include/emqx_resource_utils.hrl new file mode 100644 index 000000000..dd1517428 --- /dev/null +++ b/apps/emqx_resource/include/emqx_resource_utils.hrl @@ -0,0 +1,38 @@ +-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). + +-define(CLUSTER_CALL(Func, Args, ResParttern), +%% ekka_mnesia:running_nodes() + fun() -> + case LocalResult = erlang:apply(?MODULE, Func, Args) of + ResParttern -> + case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of + {ResL, []} -> + Filter = fun + (ResParttern) -> false; + ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}}) + when Func0 =:= Func -> false; + (_) -> true + end, + case lists:filter(Filter, ResL) of + [] -> LocalResult; + ErrL -> {error, ErrL} + end; + {ResL, BadNodes} -> + {error, {failed_on_nodes, BadNodes, ResL}} + end; + ErrorResult -> + {error, ErrorResult} + end + end()). + +-define(SAFE_CALL(_EXP_), + ?SAFE_CALL(_EXP_, _ = do_nothing)). + +-define(SAFE_CALL(_EXP_, _EXP_ON_FAIL_), + fun() -> + try (_EXP_) + catch _EXCLASS_:_EXCPTION_:_ST_ -> + _EXP_ON_FAIL_, + {error, {_EXCLASS_, _EXCPTION_, _ST_}} + end + end()). \ No newline at end of file diff --git a/apps/emqx_resource/rebar.config b/apps/emqx_resource/rebar.config new file mode 100644 index 000000000..83ac89bc3 --- /dev/null +++ b/apps/emqx_resource/rebar.config @@ -0,0 +1,14 @@ +{erl_opts, [ debug_info + %, {d, 'RESOURCE_DEBUG'} + ]}. + +{erl_first_files, ["src/emqx_resource_transform.erl"]}. + +{extra_src_dirs, ["examples"]}. + +{deps, [ {hocon, {git, "https://github.com/emqx/hocon", {branch, "master"}}} + , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}} + , {jsx, {git, "https://github.com/talentdeficit/jsx", {tag, "v3.1.0"}}} + ]}. + diff --git a/apps/emqx_resource/scripts/elvis-check.sh b/apps/emqx_resource/scripts/elvis-check.sh new file mode 100755 index 000000000..3fae0f191 --- /dev/null +++ b/apps/emqx_resource/scripts/elvis-check.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -euo pipefail + +ELVIS_VERSION='1.0.0-emqx-2' + +elvis_version="${2:-$ELVIS_VERSION}" + +echo "elvis -v: $elvis_version" + +if [ ! -f ./elvis ] || [ "$(./elvis -v | grep -oE '[1-9]+\.[0-9]+\.[0-9]+\-emqx-[0-9]+')" != "$elvis_version" ]; then + curl -fLO "https://github.com/emqx/elvis/releases/download/$elvis_version/elvis" + chmod +x ./elvis +fi + +./elvis rock --config elvis.config + diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src new file mode 100644 index 000000000..af9f48cc6 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -0,0 +1,17 @@ +{application, emqx_resource, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_resource_app, []}}, + {applications, + [kernel, + stdlib, + gproc, + hocon + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl new file mode 100644 index 000000000..b9107d328 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -0,0 +1,274 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource). + +-include("emqx_resource.hrl"). +-include("emqx_resource_utils.hrl"). + +%% APIs for resource types + +-export([ get_type/1 + , list_types/0 + , list_types_verbose/0 + ]). + +-export([ discover_resource_mods/0 + , is_resource_mod/1 + , call_instance/2 + ]). + +-export([ query_success/1 + , query_failed/1 + ]). + +%% APIs for instances + +-export([ parse_config/2 + , resource_type_from_str/1 + ]). + +%% Sync resource instances and files +%% provisional solution: rpc:multical to all the nodes for creating/updating/removing +%% todo: replicate operations +-export([ create/3 %% store the config and start the instance + , create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially + , update/4 %% update the config, stop the old instance and start the new one + %% it will create a new resource when the id does not exist + , remove/1 %% remove the config and stop the instance + ]). + +%% Calls to the callback module with current resource state +%% They also save the state after the call finished (except query/2,3). +-export([ restart/1 %% restart the instance. + , health_check/1 %% verify if the resource is working normally + , stop/1 %% stop the instance + , query/2 %% query the instance + , query/3 %% query the instance with after_query() + ]). + +%% Direct calls to the callback module +-export([ call_start/3 %% start the instance + , call_health_check/3 %% verify if the resource is working normally + , call_stop/3 %% stop the instance + , call_config_merge/4 %% merge the config when updating + ]). + +-export([ list_instances/0 %% list all the instances, id only. + , list_instances_verbose/0 %% list all the instances + , get_instance/1 %% return the data of the instance + , get_instance_by_type/1 %% return all the instances of the same resource type + , load_instances/1 %% load instances from config files + % , dependents/1 + % , inc_counter/2 %% increment the counter of the instance + % , inc_counter/3 %% increment the counter by a given integer + ]). + +-define(EXT, "*.spec"). + +-optional_callbacks([ on_query/4 + , on_health_check/2 + , on_api_reply_format/1 + , on_config_merge/3 + ]). + +-callback on_api_reply_format(resource_data()) -> map(). + +-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). + +%% when calling emqx_resource:start/1 +-callback on_start(instance_id(), resource_config()) -> + {ok, resource_state()} | {error, Reason :: term()}. + +%% when calling emqx_resource:stop/1 +-callback on_stop(instance_id(), resource_state()) -> term(). + +%% when calling emqx_resource:query/3 +-callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). + +%% when calling emqx_resource:health_check/2 +-callback on_health_check(instance_id(), resource_state()) -> + {ok, resource_state()} | {error, Reason:: term(), resource_state()}. + +%% load specs and return the loaded resources this time. +-spec list_types_verbose() -> [resource_spec()]. +list_types_verbose() -> + [get_spec(Mod) || Mod <- list_types()]. + +-spec list_types() -> [module()]. +list_types() -> + discover_resource_mods(). + +-spec get_type(module()) -> {ok, resource_spec()} | {error, not_found}. +get_type(Mod) -> + case is_resource_mod(Mod) of + true -> {ok, get_spec(Mod)}; + false -> {error, not_found} + end. + +-spec get_spec(module()) -> resource_spec(). +get_spec(Mod) -> + maps:put(<<"resource_type">>, Mod, Mod:emqx_resource_schema()). + +-spec discover_resource_mods() -> [module()]. +discover_resource_mods() -> + [Mod || {Mod, _} <- code:all_loaded(), is_resource_mod(Mod)]. + +-spec is_resource_mod(module()) -> boolean(). +is_resource_mod(Mod) -> + erlang:function_exported(Mod, emqx_resource_schema, 0). + +-spec query_success(after_query()) -> ok. +query_success(undefined) -> ok; +query_success({{OnSucc, Args}, _}) -> + safe_apply(OnSucc, Args). + +-spec query_failed(after_query()) -> ok. +query_failed(undefined) -> ok; +query_failed({_, {OnFailed, Args}}) -> + safe_apply(OnFailed, Args). + +%% ================================================================================= +%% APIs for resource instances +%% ================================================================================= +-spec create(instance_id(), resource_type(), resource_config()) -> + {ok, resource_data()} | {error, Reason :: term()}. +create(InstId, ResourceType, Config) -> + ?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}). + +-spec create_dry_run(instance_id(), resource_type(), resource_config()) -> + ok | {error, Reason :: term()}. +create_dry_run(InstId, ResourceType, Config) -> + ?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]). + +-spec update(instance_id(), resource_type(), resource_config(), term()) -> + {ok, resource_data()} | {error, Reason :: term()}. +update(InstId, ResourceType, Config, Params) -> + ?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}). + +-spec remove(instance_id()) -> ok | {error, Reason :: term()}. +remove(InstId) -> + ?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]). + +-spec query(instance_id(), Request :: term()) -> Result :: term(). +query(InstId, Request) -> + query(InstId, Request, undefined). + +%% same to above, also defines what to do when the Module:on_query success or failed +%% it is the duty of the Moudle to apply the `after_query()` functions. +-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). +query(InstId, Request, AfterQuery) -> + case get_instance(InstId) of + {ok, #{mod := Mod, state := ResourceState}} -> + %% the resource state is readonly to Moudle:on_query/4 + %% and the `after_query()` functions should be thread safe + Mod:on_query(InstId, Request, AfterQuery, ResourceState); + {error, Reason} -> + error({get_instance, {InstId, Reason}}) + end. + +-spec restart(instance_id()) -> ok | {error, Reason :: term()}. +restart(InstId) -> + call_instance(InstId, {restart, InstId}). + +-spec stop(instance_id()) -> ok | {error, Reason :: term()}. +stop(InstId) -> + call_instance(InstId, {stop, InstId}). + +-spec health_check(instance_id()) -> ok | {error, Reason :: term()}. +health_check(InstId) -> + call_instance(InstId, {health_check, InstId}). + +-spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +get_instance(InstId) -> + emqx_resource_instance:lookup(InstId). + +-spec list_instances() -> [instance_id()]. +list_instances() -> + [Id || #{id := Id} <- list_instances_verbose()]. + +-spec list_instances_verbose() -> [resource_data()]. +list_instances_verbose() -> + emqx_resource_instance:list_all(). + +-spec get_instance_by_type(module()) -> [resource_data()]. +get_instance_by_type(ResourceType) -> + emqx_resource_instance:lookup_by_type(ResourceType). + +-spec load_instances(Dir :: string()) -> ok. +load_instances(Dir) -> + emqx_resource_instance:load(Dir). + +-spec call_start(instance_id(), module(), resource_config()) -> + {ok, resource_state()} | {error, Reason :: term()}. +call_start(InstId, Mod, Config) -> + ?SAFE_CALL(Mod:on_start(InstId, Config)). + +-spec call_health_check(instance_id(), module(), resource_state()) -> + {ok, resource_state()} | {error, Reason:: term(), resource_state()}. +call_health_check(InstId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_health_check(InstId, ResourceState)). + +-spec call_stop(instance_id(), module(), resource_state()) -> term(). +call_stop(InstId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). + +-spec call_config_merge(module(), resource_config(), resource_config(), term()) -> + resource_config(). +call_config_merge(Mod, OldConfig, NewConfig, Params) -> + ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)). + +-spec parse_config(resource_type(), binary() | term()) -> + {ok, resource_config()} | {error, term()}. +parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> + case hocon:binary(RawConfig, #{format => richmap}) of + {ok, MapConfig} -> + do_parse_config(ResourceType, MapConfig); + Error -> Error + end; +parse_config(ResourceType, RawConfigTerm) -> + parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})). + +-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. +do_parse_config(ResourceType, MapConfig) -> + case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of + {error, Reason} -> {error, Reason}; + Config -> + InstConf = maps:from_list(proplists:get_value(config, Config)), + {ok, InstConf} + end. + +%% ================================================================================= + +-spec resource_type_from_str(string()) -> {ok, resource_type()} | {error, term()}. +resource_type_from_str(ResourceType) -> + try Mod = list_to_existing_atom(str(ResourceType)), + case emqx_resource:is_resource_mod(Mod) of + true -> {ok, Mod}; + false -> {error, {invalid_resource, Mod}} + end + catch error:badarg -> + {error, {not_found, ResourceType}} + end. + +call_instance(InstId, Query) -> + emqx_resource_instance:hash_call(InstId, Query). + +safe_apply(Func, Args) -> + ?SAFE_CALL(erlang:apply(Func, Args)). + +str(S) when is_binary(S) -> binary_to_list(S); +str(S) when is_list(S) -> S. diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl new file mode 100644 index 000000000..82652039d --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -0,0 +1,64 @@ +-module(emqx_resource_api). + +-export([ get_all/3 + , get/3 + , put/3 + , delete/3 + ]). +get_all(Mod, _Binding, _Params) -> + {200, #{code => 0, data => + [format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}. + +get(Mod, #{id := Id}, _Params) -> + case emqx_resource:get_instance(stringnify(Id)) of + {ok, Data} -> + {200, #{code => 0, data => format_data(Mod, Data)}}; + {error, not_found} -> + {404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}} + end. + +put(Mod, #{id := Id}, Params) -> + ConfigParams = proplists:get_value(<<"config">>, Params), + ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params), + case emqx_resource:resource_type_from_str(ResourceTypeStr) of + {ok, ResourceType} -> + do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); + {error, Reason} -> + {404, #{code => 102, message => stringnify(Reason)}} + end. + +do_put(Mod, Id, ConfigParams, ResourceType, Params) -> + case emqx_resource:parse_config(ResourceType, ConfigParams) of + {ok, Config} -> + case emqx_resource:update(Id, ResourceType, Config, Params) of + {ok, Data} -> + {200, #{code => 0, data => format_data(Mod, Data)}}; + {error, Reason} -> + {500, #{code => 102, message => stringnify(Reason)}} + end; + {error, Reason} -> + {400, #{code => 108, message => stringnify(Reason)}} + end. + +delete(_Mod, #{id := Id}, _Params) -> + case emqx_resource:remove(stringnify(Id)) of + ok -> {200, #{code => 0, data => #{}}}; + {error, Reason} -> + {500, #{code => 102, message => stringnify(Reason)}} + end. + +format_data(Mod, Data) -> + case erlang:function_exported(Mod, on_api_reply_format, 1) of + false -> + default_api_reply_format(Data); + true -> + Mod:on_api_reply_format(Data) + end. + +default_api_reply_format(#{id := Id, status := Status, config := Config}) -> + #{node => node(), id => Id, status => Status, config => Config}. + +stringnify(Bin) when is_binary(Bin) -> Bin; +stringnify(Str) when is_list(Str) -> list_to_binary(Str); +stringnify(Reason) -> + iolist_to_binary(io_lib:format("~p", [Reason])). diff --git a/apps/emqx_resource/src/emqx_resource_app.erl b/apps/emqx_resource/src/emqx_resource_app.erl new file mode 100644 index 000000000..e3ef9f3d2 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_app.erl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_app). + +-behaviour(application). + +-include("emqx_resource.hrl"). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + emqx_resource_sup:start_link(). + +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl new file mode 100644 index 000000000..d47ec18df --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -0,0 +1,294 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_instance). + +-behaviour(gen_server). + +-include("emqx_resource.hrl"). +-include("emqx_resource_utils.hrl"). + +-export([start_link/2]). + +%% load resource instances from *.conf files +-export([ load/1 + , lookup/1 + , list_all/0 + , lookup_by_type/1 + ]). + +-export([ hash_call/2 + , hash_call/3 + ]). + +%% gen_server Callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, {worker_pool, worker_id}). + +-type state() :: #state{}. + +%%------------------------------------------------------------------------------ +%% Start the registry +%%------------------------------------------------------------------------------ + +start_link(Pool, Id) -> + gen_server:start_link({local, proc_name(?MODULE, Id)}, + ?MODULE, {Pool, Id}, []). + +%% call the worker by the hash of resource-instance-id, to make sure we always handle +%% operations on the same instance in the same worker. +hash_call(InstId, Request) -> + hash_call(InstId, Request, infinity). + +hash_call(InstId, Request, Timeout) -> + gen_server:call(pick(InstId), Request, Timeout). + +-spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +lookup(InstId) -> + case ets:lookup(emqx_resource_instance, InstId) of + [] -> {error, not_found}; + [{_, Data}] -> {ok, Data#{id => InstId}} + end. + +force_lookup(InstId) -> + {ok, Data} = lookup(InstId), + Data. + +-spec list_all() -> [resource_data()]. +list_all() -> + [Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)]. + +-spec lookup_by_type(module()) -> [resource_data()]. +lookup_by_type(ResourceType) -> + [Data || #{mod := Mod} = Data <- list_all() + , Mod =:= ResourceType]. + +-spec load(Dir :: string()) -> ok. +load(Dir) -> + lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). + +load_file(File) -> + case ?SAFE_CALL(hocon_token:read(File)) of + {error, Reason} -> + logger:error("load resource from ~p failed: ~p", [File, Reason]); + RawConfig -> + case hocon:binary(RawConfig, #{format => map}) of + {ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, + <<"config">> := MapConfig}} -> + case emqx_resource:resource_type_from_str(ResourceTypeStr) of + {ok, ResourceType} -> + parse_and_load_config(Id, ResourceType, MapConfig); + {error, Reason} -> + logger:error("no such resource type: ~s, ~p", + [ResourceTypeStr, Reason]) + end; + {error, Reason} -> + logger:error("load resource from ~p failed: ~p", [File, Reason]) + end + end. + +parse_and_load_config(InstId, ResourceType, MapConfig) -> + case emqx_resource:parse_config(ResourceType, MapConfig) of + {error, Reason} -> + logger:error("parse config for resource ~p of type ~p failed: ~p", + [InstId, ResourceType, Reason]); + {ok, InstConf} -> + create_instance_local(InstId, ResourceType, InstConf) + end. + +create_instance_local(InstId, ResourceType, InstConf) -> + case do_create(InstId, ResourceType, InstConf) of + {ok, Data} -> + logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p", + [ResourceType, InstId, InstConf, Data]); + {error, Reason} -> + logger:error("create ~p resource instance: ~p failed: ~p, config: ~p", + [ResourceType, InstId, Reason, InstConf]) + end. + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +-spec init({atom(), integer()}) -> + {ok, State :: state()} | {ok, State :: state(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term()} | ignore. +init({Pool, Id}) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #state{worker_pool = Pool, worker_id = Id}}. + +handle_call({create, InstId, ResourceType, Config}, _From, State) -> + {reply, do_create(InstId, ResourceType, Config), State}; + +handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> + {reply, do_create_dry_run(InstId, ResourceType, Config), State}; + +handle_call({update, InstId, ResourceType, Config, Params}, _From, State) -> + {reply, do_update(InstId, ResourceType, Config, Params), State}; + +handle_call({remove, InstId}, _From, State) -> + {reply, do_remove(InstId), State}; + +handle_call({restart, InstId}, _From, State) -> + {reply, do_restart(InstId), State}; + +handle_call({stop, InstId}, _From, State) -> + {reply, do_stop(InstId), State}; + +handle_call({health_check, InstId}, _From, State) -> + {reply, do_health_check(InstId), State}; + +handle_call(Req, _From, State) -> + logger:error("Received unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{worker_pool = Pool, worker_id = Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%------------------------------------------------------------------------------ + +do_update(InstId, ResourceType, NewConfig, Params) -> + case lookup(InstId) of + {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> + Config = emqx_resource:call_config_merge(ResourceType, OldConfig, + NewConfig, Params), + case do_create_dry_run(InstId, ResourceType, Config) of + ok -> + do_remove(ResourceType, InstId, ResourceState), + do_create(InstId, ResourceType, Config); + Error -> + Error + end; + {ok, #{mod := Mod}} when Mod =/= ResourceType -> + {error, updating_to_incorrect_resource_type}; + {error, not_found} -> + do_create(InstId, ResourceType, NewConfig) + end. + +do_create(InstId, ResourceType, Config) -> + case lookup(InstId) of + {ok, _} -> {error, already_created}; + _ -> + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + ets:insert(emqx_resource_instance, {InstId, + #{mod => ResourceType, config => Config, + state => ResourceState, status => stopped}}), + _ = do_health_check(InstId), + {ok, force_lookup(InstId)}; + {error, Reason} -> + logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]), + {error, Reason} + end + end. + +do_create_dry_run(InstId, ResourceType, Config) -> + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState0} -> + Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of + {ok, ResourceState1} -> ok; + {error, Reason, ResourceState1} -> + {error, Reason} + end, + _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1), + Return; + {error, Reason} -> + {error, Reason} + end. + +do_remove(InstId) -> + case lookup(InstId) of + {ok, #{mod := Mod, state := ResourceState}} -> + do_remove(Mod, InstId, ResourceState); + Error -> + Error + end. + +do_remove(Mod, InstId, ResourceState) -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ets:delete(emqx_resource_instance, InstId), + ok. + +do_restart(InstId) -> + case lookup(InstId) of + {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + case emqx_resource:call_start(InstId, Mod, Config) of + {ok, ResourceState} -> + ets:insert(emqx_resource_instance, + {InstId, Data#{state => ResourceState, status => started}}), + ok; + {error, Reason} -> + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + {error, Reason} + end; + Error -> + Error + end. + +do_stop(InstId) -> + case lookup(InstId) of + {ok, #{mod := Mod, state := ResourceState} = Data} -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ok; + Error -> + Error + end. + +do_health_check(InstId) -> + case lookup(InstId) of + {ok, #{mod := Mod, state := ResourceState0} = Data} -> + case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of + {ok, ResourceState1} -> + ets:insert(emqx_resource_instance, + {InstId, Data#{status => started, state => ResourceState1}}), + ok; + {error, Reason, ResourceState1} -> + logger:error("health check for ~p failed: ~p", [InstId, Reason]), + ets:insert(emqx_resource_instance, + {InstId, Data#{status => stopped, state => ResourceState1}}), + {error, Reason} + end; + Error -> + Error + end. + +%%------------------------------------------------------------------------------ +%% internal functions +%%------------------------------------------------------------------------------ + +proc_name(Mod, Id) -> + list_to_atom(lists:concat([Mod, "_", Id])). + +pick(InstId) -> + gproc_pool:pick_worker(emqx_resource_instance, InstId). diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl new file mode 100644 index 000000000..275de6dec --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -0,0 +1,48 @@ +%%%------------------------------------------------------------------- +%% @doc emqx_resource top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(emqx_resource_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(RESOURCE_INST_MOD, emqx_resource_instance). +-define(POOL_SIZE, 64). %% set a very large pool size in case all the workers busy + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + TabOpts = [named_table, set, public, {read_concurrency, true}], + _ = ets:new(emqx_resource_instance, TabOpts), + + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, + Pool = ?RESOURCE_INST_MOD, + Mod = ?RESOURCE_INST_MOD, + ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]), + {ok, {SupFlags, [ + begin + ensure_pool_worker(Pool, {Pool, Idx}, Idx), + #{id => {Mod, Idx}, + start => {Mod, start_link, [Pool, Idx]}, + restart => transient, + shutdown => 5000, type => worker, modules => [Mod]} + end || Idx <- lists:seq(1, ?POOL_SIZE)]}}. + +%% internal functions +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + +ensure_pool_worker(Pool, Name, Slot) -> + try gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end. \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_transform.erl b/apps/emqx_resource/src/emqx_resource_transform.erl new file mode 100644 index 000000000..844350278 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_transform.erl @@ -0,0 +1,99 @@ +-module(emqx_resource_transform). + +-include_lib("syntax_tools/include/merl.hrl"). + +-export([parse_transform/2]). + +parse_transform(Forms, _Opts) -> + Mod = hd([M || {attribute, _, module, M} <- Forms]), + AST = trans(Mod, proplists:delete(eof, Forms)), + debug_print(Mod, AST), + AST. + +-ifdef(RESOURCE_DEBUG). + +debug_print(Mod, Ts) -> + {ok, Io} = file:open("./" ++ atom_to_list(Mod) ++ ".trans.erl", [write]), + do_debug_print(Io, Ts), + file:close(Io). + +do_debug_print(Io, Ts) when is_list(Ts) -> + lists:foreach(fun(T) -> do_debug_print(Io, T) end, Ts); +do_debug_print(Io, T) -> + io:put_chars(Io, erl_prettypr:format(merl:tree(T))), + io:nl(Io). +-else. +debug_print(_Mod, _AST) -> + ok. +-endif. + +trans(Mod, Forms) -> + forms(Mod, Forms) ++ [erl_syntax:revert(erl_syntax:eof_marker())]. + +forms(Mod, [F0 | Fs0]) -> + case form(Mod, F0) of + {CurrForm, AppendedForms} -> + CurrForm ++ forms(Mod, Fs0) ++ AppendedForms; + {AHeadForms, CurrForm, AppendedForms} -> + AHeadForms ++ CurrForm ++ forms(Mod, Fs0) ++ AppendedForms + end; +forms(_, []) -> []. + +form(Mod, Form) -> + case Form of + ?Q("-emqx_resource_api_path('@Path').") -> + {fix_spec_attrs() ++ fix_api_attrs(erl_syntax:concrete(Path)) ++ fix_api_exports(), + [], + fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)}; + _ -> + %io:format("---other form: ~p~n", [Form]), + {[], [Form], []} + end. + +fix_spec_attrs() -> + [ ?Q("-export([emqx_resource_schema/0]).") + , ?Q("-export([structs/0]).") + , ?Q("-behaviour(hocon_schema).") + ]. +fix_spec_funcs(_Mod) -> + [ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")) + , ?Q("structs() -> [\"config\"].") + ]. + +fix_api_attrs(Path0) -> + BaseName = filename:basename(Path0), + Path = "/" ++ BaseName, + [erl_syntax:revert( + erl_syntax:attribute(?Q("rest_api"), [ + erl_syntax:abstract(#{ + name => list_to_atom(Name ++ "_log_tracers"), + method => Method, + path => mk_path(Path, WithId), + func => Func, + descr => Name ++ " the " ++ BaseName})])) + || {Name, Method, WithId, Func} <- [ + {"list", 'GET', noid, api_get_all}, + {"get", 'GET', id, api_get}, + {"update", 'PUT', id, api_put}, + {"delete", 'DELETE', id, api_delete}]]. + +fix_api_exports() -> + [?Q("-export([api_get_all/2, api_get/2, api_put/2, api_delete/2]).")]. + +fix_api_funcs(Mod) -> + [erl_syntax:revert(?Q( + "api_get_all(Binding, Params) -> + emqx_resource_api:get_all('@Mod@', Binding, Params).")), + erl_syntax:revert(?Q( + "api_get(Binding, Params) -> + emqx_resource_api:get('@Mod@', Binding, Params).")), + erl_syntax:revert(?Q( + "api_put(Binding, Params) -> + emqx_resource_api:put('@Mod@', Binding, Params).")), + erl_syntax:revert(?Q( + "api_delete(Binding, Params) -> + emqx_resource_api:delete('@Mod@', Binding, Params).")) + ]. + +mk_path(Path, id) -> Path ++ "/:bin:id"; +mk_path(Path, noid) -> Path. diff --git a/apps/emqx_resource/src/emqx_resource_uitils.erl b/apps/emqx_resource/src/emqx_resource_uitils.erl new file mode 100644 index 000000000..b49eeb927 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_uitils.erl @@ -0,0 +1 @@ +-module(emqx_resource_uitils). \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_validator.erl b/apps/emqx_resource/src/emqx_resource_validator.erl new file mode 100644 index 000000000..e9517f160 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_validator.erl @@ -0,0 +1,63 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_validator). + +-export([ min/2 + , max/2 + , equals/2 + , enum/1 + , required/1 + ]). + +max(Type, Max) -> + limit(Type, '=<', Max). + +min(Type, Min) -> + limit(Type, '>=', Min). + +equals(Type, Expected) -> + limit(Type, '==', Expected). + +enum(Items) -> + fun(Value) -> + return(lists:member(Value, Items), + err_limit({enum, {is_member_of, Items}, {got, Value}})) + end. + +required(ErrMsg) -> + fun(undefined) -> {error, ErrMsg}; + (_) -> ok + end. + +limit(Type, Op, Expected) -> + L = len(Type), + fun(Value) -> + Got = L(Value), + return(erlang:Op(Got, Expected), + err_limit({Type, {Op, Expected}, {got, Got}})) + end. + +len(array) -> fun erlang:length/1; +len(string) -> fun string:length/1; +len(_Type) -> fun(Val) -> Val end. + +err_limit({Type, {Op, Expected}, {got, Got}}) -> + io_lib:format("Expect the ~s value ~s ~p but got: ~p", [Type, Op, Expected, Got]). + +return(true, _) -> ok; +return(false, Error) -> + {error, Error}.