文章
· 二月 27, 2023 阅读大约需 15 分钟

物联网 (IOT) 在 InterSystems IRIS 平台上的应用

物联网( IoT ,Internet of things )即“万物相连的互联网”,是互联网基础上的延伸和扩展的网络,将各种信息传感设备与网络结合起来而形成的一个巨大网络,实现任何时间、任何地点,人、机、物的互联互通 

物联网是新一代信息技术的重要组成部分,IT行业又叫:泛互联,意指物物相连,万物万联。由此,“物联网就是物物相连的互联网”。这有两层意思:第一,物联网的核心和基础仍然是互联网,是在互联网基础上的延伸和扩展的网络;第二,其用户端延伸和扩展到了任何物品与物品之间,进行信息交换和通信。因此,物联网(Internet of Things,简称IoT)是指通过各种信息传感器、射频识别技术全球定位系统红外感应器、激光扫描器等各种装置与技术,实时采集任何需要监控、 连接、互动的物体或过程,采集其声、光、热、电、力学、化学、生物、位置等各种需要的信息,通过各类可能的网络接入,实现物与物、物与人的泛在连接,实现对物品和过程的智能化感知、识别和管理。物联网是一个基于互联网、传统电信网等的信息承载体,它让所有能够被独立寻址的普通物理对象形成互联互通的网络。在https://baike.baidu.com/item/物联网/7306589?fromtitle=IoT&fromid=552548&fr=aladdin 查看更多信息

IoT(物联网)作为一个互连事物的网络,包括车辆、机器、建筑物、家用设备或任何其他具有嵌入式 TCP/IP 远程连接可用的事物,允许其接收和发送执行指令和数据。每个事物都为物联网网络提供一项或多项服务。例如,智能灯泡提供关灯和开灯服务;智能空调维持环境温度;智能相机在捕捉运动时发送通知。然而,所有这些设备都可以一起运行以提供建筑物安全性,例如。当摄像头检测到运动时,灯就会亮起。那些可以提供业务和集成服务(例如构建安全性)的流程的编排由服务器软件控制器完成。它可以从十几个智能设备接收数据和指令并将其发送到 TCP/IP 网络(如 Internet),以将这些东西作为主要服务进行操作。

事物和服务器软件使用两种协议来互操作数据和指令:MQTT(消息队列遥测传输)和 REST(表述性状态传输)API。第一个更常见,因为它的结构更简单。某些设备的内存和处理器功能有限,没有足够的资源来使用 REST。因此,本文将详细介绍如何使用 MQTT 实现服务器端软件以与物联网进行互操作。

关于 MQTT

MQTT 是一种用于消息队列/消息队列服务的轻量级、发布-订阅、机器对机器网络协议。它专为连接具有资源限制或网络带宽有限的设备的远程位置而设计。它必须在提供有序、无损、双向连接的传输协议上运行——通常是 TCP/IP。它是一个开放的 OASIS 标准和 ISO 推荐标准 (ISO/IEC 20922)。在https://baike.baidu.com/item/MQTT/3618851?fr=aladdin 上查看更多信息。

MQTT 协议有两个角色来在事物之间互操作数据:消息代理和客户端。第一个角色是服务器软件,用于接收和发布由一个或多个客户端生成或使用的消息。另外一个角色是智能设备和服务器软件,用于使用物的服务提供主要服务,例如提供楼宇自动化服务的服务器软件,或使用智能信号量和公共摄像头服务控制城市交通的软件。


MQTT 代理

市场上有各种 MQTT 消息代理,比较受欢迎的是 Eclipse Mosquitto (https://mosquitto.org/),因此我们将在示例中使用它。
MQTT 代理类似于 JMS 或 MQ 代理。它是一种用于接收消息并将其发布到主题中的中间体。因此,连接的客户端可以订阅一个或多个主题以生成或使用来自这些主题的消息。这些主题允许客户端(软件和设备)以一种非耦合和异步的方式完成他们的工作。这意味着每个客户端都会观察一条消息并在必要时采取行动。他们也可以自己发布新消息。例如,如果恒温器在主题“A 楼”中发送一条指示高温的消息,则安装在该建筑物中并列在主题“A 楼”中的空调可以从那里获取此消息并自行打开以降低温度.但是,通常会看到带有业务规则的服务器端软件在温度达到特定值时自动打开空调。如果将 MQTT 代理配置为在关闭时保留消息并在再次打开时恢复它们,则它还可以保证消息传递。

MQTT Broker 的其他优点包括(来自维基百科):

  1. 消除易受攻击和不安全的客户端连接(如果配置为这样做)。
  2. 能够轻松地从单个设备扩展到数千个小工具。
  3. 管理和跟踪所有客户端连接状态,包括安全凭证和证书(如果配置为这样做)。
  4. 在不影响安全性的情况下减少网络压力,如果配置为这样做(使用蜂窝或卫星网络时)。

MQTT消息

有 3 种类型的消息:

  1. 连接:在 TCP/IP 会话中建立代理和客户端之间的连接。
  2. 断开连接:断开服务器和客户端与 TCP/IP 会话的连接。
  3. 发布:将消息放入主题并将该消息的副本发送给订阅该主题的每个客户端。

查看正在运行的消息示例:


InterSystems IRIS 支持物联网和 MQTT 协议

根据 InterSystems IRIS 文档,目前支持的版本是 MQTT 3.1。此 MQTT 规范被定义为 OASIS 标准, MQTT 版本 3.1.1 。 IRIS 的互操作性模块定义了一个入站(用于使用 MQTT 消息)和一个出站(用于生成 MQTT 消息)MQTT 适配器。可以使用这些适配器开发自定义业务服务和操作,或者您可以分别使用内置业务服务和操作EnsLib.MQTT.Service.PassthroughEnsLib.MQTT.Operation.Passthrough

如果您想在互操作性生产之外使用 MQTT 协议,您可以使用较低级别的 %Net.MQTT 类。 MQTT 类使用Eclipse Paho MQTT C 客户端库

创建业务服务以使用 MQTT 消息

Class packagename.NewService1 Extends Ens.BusinessService { Parameter ADAPTER = "EnsLib.MQTT.Adapter.Inbound" ; Method OnProcessInput(pInput As EnsLib.MQTT.Message, pOutput As %RegisteredObject ) As %Status { set tsc= $$$OK set messageContent = pInput.StringValue …. Quit tsc } }

创建业务操作以生成/发送 MQTT 消息

Class packagename.NewOperation1 Extends Ens.BusinessOperation { Parameter ADAPTER = "EnsLib.MQTT.Adapter.Outbound" ; Parameter SETTINGS = "-SendSuperSession" ; Method OnMessage(pRequest As packagename.Request, Output pResponse As packagename.Response) As %Status { #dim tSC As %Status = $$$OK #dim e As %Exception.AbstractException Try { Set message = ##class (EnsLib.MQTT.Message). %New () Set message.Topic = ..Adapter .Topic Set jsonValue = {} Set jsonValue.message = "Response message” Set message.StringValue = jsonValue.%ToJSON() Set tSC=..Adapter.Send(message.Topic,message.StringValue) Set pResponse = ##class(packagename.Response).%New() Set pResponse.message = “Message response” } Catch e { Set tSC=e.AsStatus() } Quit tSC }

入站和出站 MQTT 适配器的设置

业务服务和业务运营都可以配置以下参数(来源:https://docs.intersystems.com/irisforhealthlatest/csp/docbook/DocBook.UI...):

  • 客户 ID:这是向经纪人标识此客户的字符串。它必须是 ASCII 编码的并且包含 1 到 23 个字符。
  • 连接超时:这是连接超时。连接到繁忙的服务器可能需要一些时间,因此可以使用此超时来避免过早的连接失败。它指定在连接尝试失败之前等待的秒数。
  • 凭据名称:这是用于访问 MQTT 代理的一组凭据值的 ID 名称。 Credentials 项中定义的用户名和密码必须是 ASCII 编码的。如果代理不需要登录凭据,则不需要。
  • Keep Alive:指定在客户端发送给代理的消息之间传递的最大秒数。
  • QOS:这决定了所需的服务质量。它可以有以下两个值之一:0 - QOSFireAndForget:不等待来自代理的响应和 1 - QOSWaitForDelivery:等待来自代理的响应,如果代理没有响应则发出错误。
  • 保留:这是向代理指示消息是否应保留的标志。
  • 主题:这是您希望发布或订阅的主题的名称。主题必须是 ASCII 编码的。主题通常是一个层次结构字符串,其子主题级别由 /(正斜杠)分隔。在订阅中,主题可以使用通配符作为主题级别。
  • URL:这是您希望与之通信的经纪人的 URL。该方案是“tcp”或“ssl”,后跟域名和以“:”分隔的端口,例如“tcp://BIGBADAPPLE.local:1883”。通常启用 TLS 的端点配置有 8883 端口,但这不是强制性的。

下载并安装示例应用程序

IRIS IoT 示例是一个简单的应用程序,向您展示如何使用互操作性生产来使用(接收)和生成(发送)MQTT 消息。要获取它,请访问 https://openexchange.intersystems.com/package/IoT-Sample。现在,执行后续步骤。

1. clone/git pull repo 到任何本地目录

-----0-----

2、在该目录下打开终端,运行:

-----1-----

3. 使用您的项目运行 IRIS 容器:

-----2-----

如果要使用 ZPM 安装它,请按照以下步骤操作:

1. 打开启用了互操作性的 IRIS 命名空间。
2. 打开终端并调用:

-----3-----

运行示例应用程序。

1.打开制作,开始制作。它将开始观察 MQTT 主题 /DeviceStatusInputTopic 并生成对 MQTT 主题 /DeviceStatusOutputTopic 的响应。查看:


生产

2. 使用MQTT 客户端发送消息并测试生产。为此,请在您的 Google Chrome 浏览器上访问https://chrome.google.com/webstore/detail/mqttbox/kaajoficamnjijhkeomgfljpicifbkaf 。它是 Chrome 插件 MQTTBox。只需点击添加到 Chrome,然后点击添加应用程序。


MQTT盒子

3. 在您的 Google Chrome 浏览器中,转到 chrome://apps/ 并选择 MQTTBox(如果需要,单击仍然打开)。


打开 MQTTBox

4. 单击创建 MQTT 客户端按钮。

单击创建客户端

5. 使用以下设置配置 MQTT 连接:

  • 客户名称:本地
  • 协议:mqtt/tcp
  • 主机:本地主机:1883
  • 用户名:admin
  • 密码:管理员

所有其他设置保留默认值。

6. 配置 MQTT 主题以发送和接收 MQTT 消息:

  • 要发布的主题:/DeviceStatusInputTopic
  • 订阅主题:/DeviceStatusOutputTopic
  • 有效载荷:
 { "deviceId" : "Air Conditioner Level 1" , "statusDate" : "2023-01-07 14:03:00" , "status" : 0 }


MQTTBox 主题配置
7. 单击按钮订阅以检查关于主题 /DeviceStatusOutputTopic 的消息。


MQTTBox 等待消息

8. 单击按钮 Publish 将消息发送到 /DeviceStatusInputTopic,并在 /DeviceStatusOutputTopic 上查看 IRIS 生产产生的结果。


MQTTBox 主题结果

9. 检查 IRIS 管理门户 Visual Trace 上的消息处理会话


生产视觉跟踪

示例应用程序源代码

Dockerfile

 ARG IMAGE=intersystemsdc/irishealth-community ARG IMAGE=intersystemsdc/iris-community FROM $IMAGE WORKDIR /home/irisowner/irisbuild ARG TESTS= 0 ARG MODULE= "iris-iot-sample" ARG NAMESPACE= "USER" RUN --mount= type = bind ,src=.,dst=. \ iris start IRIS && \ iris session IRIS < iris.script && \ ([ $TESTS -eq 0 ] || iris session iris -U $NAMESPACE "##class(%ZPM.PackageManager).Shell(\"test $MODULE -v -only\",1,1)" ) && \ iris stop IRIS quietly

最新版本的 InterSystems IRIS Community 用于创建 InterSystems IRIS 的 docker 实例,其命名空间名为 USER。

docker-compose 文件

version: '3.6' services: mosquitto: image: eclipse-mosquitto: 2 container_name: mosquitto user : root volumes: - ./mosquitto/config/:/mosquitto/config/ - ./mosquitto/log/:/mosquitto/log/ - ./mosquitto/data/:/mosquitto/data/ ports: - 1883 : 1883 - 9001 : 9001 iris: build: context: . dockerfile: Dockerfile restart: always command: --check-caps false ports: - 1972 - 52795 : 52773 - 53773 volumes: - ./:/irisdev/app

现在创建了两个 docker 容器实例,并且启动并运行。第一个是基于 Eclipse Mosquitto 产品的 MQTT 代理(mosquitto 服务),第二个是 MQTT 服务器端软件,负责消费和生产 MQTT 消息以提供设备监控服务。
mosquitto docker 实例在文件 /mosquitto/config/mosquitto.conf 中配置。在那里定义了端口、消息持久性、安全和日志问题。文件 /mosquitto/config/password.txt 也将用户确定为 admin,密码也是 admin。但是,它是通过命令 mosquitto_passwd -U password.txt 加密的(您可以在https://mosquitto.org/man/mosquitto_passwd-1.html阅读更多相关信息)。

虹膜脚本文件

zn "%SYS" Do ##class (Security.Users).UnExpireUserPasswords( "*" ) zn "USER" Do ##class (EnsPortal.Credentials).SaveItem( 0 , "mosquitto_cred" , "mosquitto_cred" , "admin" , "admin" , "" ) zpm "load /home/irisowner/irisbuild/ -v" : 1 : 1 halt

该文件为业务服务和业务操作创建凭证。登录到 MQTT Broker 并运行 ZPM 文件 module.xml。

模块.xml

该文件用于在服务器上编译源代码以及在使用 ZPM 时安装示例应用程序。

 <?xml version= "1.0" encoding= "UTF-8" ?> <Export generator= "Cache" version= "25" > <Document name= "iris-iot-sample.ZPM" > <Module> <Name>iris-iot-sample</Name> <Description>A simple IRIS interoperability application - for IoT using MQTT.</Description> <Version> 1.0 .8</Version> <Packaging> module </Packaging> <Dependencies> <ModuleReference> <Name>sslclient</Name> <Version> 1.0 .1</Version> </ModuleReference> </Dependencies> <SourcesRoot>src</SourcesRoot> <Resource Name= "dc.irisiotsample.PKG" /> <SystemRequirements Version= ">=2020.1" Interoperability= "enabled" /> </Module> </Document> </Export>

在 SourceRoot 标签上有要编译的源包。

DeviceStatus 持久类

Class dc.irisiotsample.DeviceStatus Extends %Persistent { Property deviceId As %String ; Property statusDate As %TimeStamp ; Property status As %Boolean ; Storage Default { <Data name= "DeviceStatusDefaultData" > <Value name= "1" > <Value>% %CLASSNAME </Value> </Value> <Value name= "2" > <Value>deviceId</Value> </Value> <Value name= "3" > <Value>statusDate</Value> </Value> <Value name= "4" > <Value>status</Value> </Value> </Data> <DataLocation> ^dc .irisiotsample.DeviceStatusD</DataLocation> <DefaultData>DeviceStatusDefaultData</DefaultData> <IdLocation> ^dc .irisiotsample.DeviceStatusD</IdLocation> <IndexLocation> ^dc .irisiotsample.DeviceStatusI</IndexLocation> <StreamLocation> ^dc .irisiotsample.DeviceStatusS</StreamLocation> <Type> %Storage.Persistent </Type> } }

此类用于将 DeviceStatus 保存在 SQL 表中。

DeviceStatusRequest 类

Class dc.irisiotsample.DeviceStatusRequest Extends Ens.Request { Property deviceId As %String ; Property statusDate As %TimeStamp ; Property status As %Boolean ; Storage Default { <Data name= "DeviceStatusRequestDefaultData" > <Subscript> "DeviceStatusRequest" </Subscript> <Value name= "1" > <Value>deviceId</Value> </Value> <Value name= "2" > <Value>statusDate</Value> </Value> <Value name= "3" > <Value>status</Value> </Value> </Data> <DefaultData>DeviceStatusRequestDefaultData</DefaultData> <Type> %Storage.Persistent </Type> } }

此类用于在互操作性组件之间交换数据。

设备状态服务

Class dc.irisiotsample.DeviceStatusService Extends Ens.BusinessService { Parameter ADAPTER = "EnsLib.MQTT.Adapter.Inbound" ; Method OnProcessInput(pInput As EnsLib.MQTT.Message, pOutput As Ens.StringResponse) As %Status { set tsc= $$$OK set DeviceStatusValue = ##class ( %DynamicAbstractObject ). %FromJSON (pInput.StringValue) set DeviceStatus = ##class (dc.irisiotsample.DeviceStatusRequest). %New () set DeviceStatus.deviceId = DeviceStatusValue.deviceId set DeviceStatus.statusDate = DeviceStatusValue.statusDate set DeviceStatus.status = DeviceStatusValue.status set tsc = ..SendRequestSync ( "DeviceStatusProcess" , DeviceStatus, .Response, - 1 , "Device Status Process" ) set pOutput = Response quit tsc } }

adapter 参数表示在接收 MQTT 消息时使用 MQTT 适配器。方法 OnProcessInput 在 pInput 上接收 MQTT 消息并将其发送到业务流程,调用方法 SendRequestSync 作为同步消息。

设备状态进程

Class dc.irisiotsample.DeviceStatusProcess Extends Ens.BusinessProcess { Method OnRequest(request As dc.irisiotsample.DeviceStatusRequest, Output response As Ens.StringResponse) As %Status { Set tsc = 1 Set response = ##class (Ens.StringResponse). %New () Set DeviceStatus = ##class (dc.irisiotsample.DeviceStatus). %New () Set DeviceStatus.deviceId = request.deviceId Set DeviceStatus.status = request.status Set DeviceStatus.statusDate = request.statusDate Set tsc = DeviceStatus. %Save () If $$$ISOK (tsc) { Set tsc = ..SendRequestSync ( "DeviceStatusOperation" , request, .pResponse, - 1 , "Device Status Operation" ) Set response.StringValue = "Device id " _pResponse.deviceId_ " has the status " _pResponse.status } Else { Set response.StringValue = "Error on save the device status" Set SuspendMessage = 1 } quit tsc } Storage Default { <Type> %Storage.Persistent </Type> } }

此类从业务服务接收消息,将其保存到数据库,并调用 DeviceStatusOperation 发送(生成)带有结果的消息。

DeviceStatusOperation

 Class dc.irisiotsample.DeviceStatusOperation Extends Ens.BusinessOperation { Parameter ADAPTER = "EnsLib.MQTT.Adapter.Outbound" ; Parameter SETTINGS = "-SendSuperSession" ; Method NotifyDeviceStatus(pRequest As dc.irisiotsample.DeviceStatusRequest, Output pResponse As dc.irisiotsample.DeviceStatusResponse) As %Status { #dim tSC As %Status = $$$OK #dim e As %Exception.AbstractException Try { Set message = ##class (EnsLib.MQTT.Message). %New () Set message.Topic = ..Adapter .Topic Set jsonValue = {} Set jsonValue.message = "Device " _pRequest.deviceId_ " has status " _pRequest.status Set message.StringValue = jsonValue. %ToJSON () Set tSC= ..Adapter .Send(message.Topic,message.StringValue) Set pResponse = ##class (dc.irisiotsample.DeviceStatusResponse). %New () Set pResponse.deviceId = pRequest.deviceId Set pResponse.status = pRequest.status } Catch e { Set tSC=e.AsStatus() } Quit tSC } XData MessageMap { <MapItems> <MapItem MessageType= "dc.irisiotsample.DeviceStatusRequest" > <Method>NotifyDeviceStatus</Method> </MapItem> </MapItems> } }

此类从业务流程接收消息,并生成一条 MQTT 消息,发送到具有流程响应的主题。

设备状态生产

Class dc.irisiotsample.DeviceStatusProduction Extends Ens.Production { XData ProductionDefinition { <Production Name= "dc.irisiotsample.DeviceStatusProduction" LogGeneralTraceEvents= "false" > <Description></Description> <ActorPoolSize> 2 </ActorPoolSize> <Item Name= "DeviceStatusService" Category= "" ClassName= "dc.irisiotsample.DeviceStatusService" PoolSize= "1" Enabled= "true" Foreground= "false" Comment= "" LogTraceEvents= "false" Schedule= "" > <Setting Target= "Adapter" Name= "ClientID" >InterSystemsIRIS</Setting> <Setting Target= "Adapter" Name= "Topic" >/DeviceStatusInputTopic</Setting> <Setting Target= "Adapter" Name= "Url" >tcp: //mosquitto:1883</Setting> <Setting Target= "Adapter" Name= "CredentialsName" >mosquitto_cred</Setting> </Item> <Item Name= "DeviceStatusProcess" Category= "" ClassName= "dc.irisiotsample.DeviceStatusProcess" PoolSize= "1" Enabled= "true" Foreground= "false" Comment= "" LogTraceEvents= "false" Schedule= "" > </Item> <Item Name= "DeviceStatusOperation" Category= "" ClassName= "dc.irisiotsample.DeviceStatusOperation" PoolSize= "1" Enabled= "true" Foreground= "false" Comment= "" LogTraceEvents= "false" Schedule= "" > <Setting Target= "Adapter" Name= "ClientID" >InterSystemsIRIS</Setting> <Setting Target= "Adapter" Name= "Topic" >/DeviceStatusOutputTopic</Setting> <Setting Target= "Adapter" Name= "Url" >tcp: //mosquitto:1883</Setting> <Setting Target= "Adapter" Name= "CredentialsName" >mosquitto_cred</Setting> </Item> </Production> } }

此类使用所需参数配置业务服务、业务流程和业务操作,并将这些组件放在一起工作。

主要配置的参数有:

  • URL:设置 MQTT Broker 地址。
  • 主题:接收和发送 MQTT 消息的队列。 DeviceStatusInputTopic 用于业务服务,DeviceStatusOutputTopic 用于业务操作。
  • CredentialsName:使用用户名和密码设置凭据以连接 MQTT 代理。
  • ClientID:它是为 InterSystems IRIS 分配的逻辑名称,代理需要它来识别 MQTT 客户端。

如您所见,用于 MQTT 的 IRIS 互操作性适配器是一个非常简单且功能强大的工具,用于使用和生成 MQTT 消息以及自动化涉及 IoT 设备的业务流。所以享受它。

查看原贴由 @Yuri Marx 撰写

讨论 (0)1
登录或注册以继续