文章
· 六月 27, 2022 阅读大约需 7 分钟

第十二章 信号(二)- 生产者消费者示例

第十二章 信号(二)- 生产者消费者示例

下面是一系列使用信号量实现生产者/消费者场景的类。 “主”进程初始化信号量并等待用户指示活动已全部完成。生产者在循环中随机增加一个信号量值,更新之间的延迟可变。消费者尝试在随机时间从信号量中删除随机数量,也是在循环中。该示例由 5 个类组成:
- Main – 初始化环境并等待信号量上的活动完成的类。
- Counter – 实现信号量本身的类。它记录它的创建以及由于信号量在等待列表中而发生的任何回调。
- Producer – 一个类,其主要方法增加信号量值。增量是一个随机选择的小整数。完成增量后,该方法会在下一个增量之前延迟一小段随机数秒。
- Consumer 消费者——这是对生产者的补充。此类的主要方法尝试将信号量减少一个随机选择的小整数。它将递减请求添加到其等待列表中,等待时间也是随机选择的秒数。
- Util - 这个类有几个方法被示例的其他类使用。几种方法解决了为所有活动维护公共日志的问题;其他人解决了多个消费者和多个生产者的命名问题。

注意:组成这些类的代码特意写得简单。尽可能地,每个语句只完成一个动作。这应该使用户更容易和更直接地修改示例。

Class: Semaphore.Main

此类建立演示环境。它调用实用程序类来初始化日志和名称索引工具。然后它用初始值 0 初始化公共信号量,并等待用户输入一个字符(通常是 ENTER 键),表明实验已经完成。

一旦它接收到用户输入,它就会报告信号量的当前值,尝试删除它,并终止执行。

Class Semaphore.Main Extends %RegisteredObject
{

/// 共享信号量的名称
Parameter ME = "Main";

/// 信号量演示的驱动程序
ClassMethod Run()
{
    // 初始化日志记录全局变量
    d ##class(Semaphore.Util).InitLog()
    d ##class(Semaphore.Util).InitIndex()

    s msg = ..#ME _ " 开始"
    d ..Log(msg)

    // 创建和初始化信号量
    s inventory = ##class(Semaphore.Counter).%New()
    if ('($isobject(inventory))) {
        s msg = "%New() of MySem failed"
        d ..Log(msg)
        q
    }

    s msg = "信号创建结果: " _ inventory.Init(0)
    d ..Log(msg)

    // 等待终止响应
    s msg = "输入任何字符以终止 Run 方法"
    d ..Log(msg)

    r *x

    // 报告最终值,删除信号量并完成
    s msg = "终值 = " _ inventory.GetValue()
    d ..Log(msg)
    s msg = "信号量删除状态: " _ inventory.Delete()
    d ..Log(msg)
    s msg = ..#ME _ " 结束"
    d ..Log(msg)

    q
}

/// 将收到的消息输入到公共日志中
ClassMethod Log(msg As %String) [ Private ]
{
    d ##class(Semaphore.Util).Logger(..#ME, msg)
    q
}

}

Class: Semaphore.Counter

此类实现示例中使用的信号量。根据需要,它是 %SYSTEM.Semaphore 的子类,并提供方法 WaitCompleted 的实现。为了简单起见,初始化信号量的代码也包含在这个类中。还有一个类方法提供此信号量的名称,以允许设置、生产者和消费者类获取它。

Class Semaphore.Counter Extends %SYSTEM.Semaphore
{

ClassMethod Name() As %String
{
    Quit "Counter"
}

/// 直接向日志工具发送消息
Method Log(Msg As %String) [ Private ]
{
    d ##class(Semaphore.Util).Logger(..Name(), Msg)
    q
}

/// 将信号量 id 值作为十六进制字符串返回
Method MyId() As %String
{
   q ("0x" _ $zhex(..SemID))
}

/// 创建实例时调用
Method %OnNew() As %Status
{
    s msg = "新信号"
    d ..Log(msg)
    q $$$OK
}

Method Init(initvalue = 0) As %Status
{
    try {
        if (..Create(..Name(), initvalue)) {
            s msg = "创建 """ _ ..Name() 
                    _ """; 值为 = " _ initvalue
                    _ "; Id = 0x" _ ..MyId()
            d ..Log(msg)
            ret 1
        } else {
            s msg = "信号创建失败: Name = """ _ ..Name() _ """"
            d ..Log(msg)
            ret 0
        }
    } catch {
        s msg = "捕获信号量故障"
        d ..Log(msg)
        ret 0
    }
}

Method %OnClose() As %Status [ Private ]
{
    s msg = "关闭信号量: Id = " _ ..MyId()
    d ..Log(msg)
    q $$$OK
}

/// 此方法由 WaitMany() 作为回调调用。信号量中存在非零数量或等待超时。
/// 减少的数量作为参数传递给此方法;零,在超时的情况下。
/// 
/// 调用此方法后,信号量将从等待多列表中删除。
/// 需要显式调用 AddToWaitMany 才能将其放回等待列表。
Method WaitCompleted(amt As %Integer)
{
    // 只需报告递减量
    s msg = "WaitCompleted: " _ ..MyId() _ "; Amt = " _ amt
    d ..Log(msg)
    q
}

}

Class: Semaphore.Producer

此类负责获取公共信号量的 OREF。一旦它拥有了OREF,它就会尝试将信号量重复增加一个随机选择的小整数,并在每次增量之间暂停一个小的随机选择间隔。每次增加信号量的尝试都会输入到日志中。

Class Semaphore.Producer Extends %RegisteredObject
{

/// 类名
Parameter MeBase = "Producer";

/// 暂停后以少量随机增加信号量
ClassMethod Run() As %Status
{
    // 建立名称和访问信号量
    s ME = ##class(Semaphore.Util).IndexName(..#MeBase)
    s msg = ME _ " 开始"
    d ..Logger(ME, msg)

    s cell = ##class(Semaphore.Counter).%New()
    d cell.Open(##class(Semaphore.Counter).Name())

    s msg = "open Id = " _ cell.MyId()
    d ..Logger(ME, msg)

    // 在随机时间按随机量增加信号量
    for addcnt = 1 : 1 : 8 {
        s incamt = $random(5) + 1
        s waitsec = $random(10) + 1
        s msg = "increment " _ cell.MyId() 
                _ " = " _ cell.GetValue()
                _ " by " _ incamt
                _ " wait " _ waitsec _ " sec"
        d cell.Increment(incamt)
        d ..Logger(ME, msg)
        h waitsec
    }

    // 结束
    s msg = ME _ " 结束"
    d ..Logger(ME, msg)

    q $$$OK
}

/// 将消息传送到日期记录器
ClassMethod Logger(id As %String, msg As %String) [ Private ]
{
    d ##class(Semaphore.Util).Logger(id, msg)
    q
}

}

Class: Semaphore.Consumer

这个类是对 Semaphore.Producer 的补充。它也获得了公共信号量的 OREF,并以与 Producer 类似的方式尝试将信号量重复减少随机选择的数量,并在每次尝试之间随机选择暂停。每次尝试的成功或失败都会写入日志。

Class Semaphore.Consumer Extends %RegisteredObject
{

/// 类名
Parameter MeBase = "Consumer";

/// 暂停后将信号量减少少量随机数
ClassMethod Run() As %Status
{
    // 建立名称和访问信号量
    s ME = ##class(Semaphore.Util).IndexName(..#MeBase)
    s msg = ME _ " 开始"
    d ..Logger(ME, msg)

    s cell = ##class(Semaphore.Counter).%New()
    d cell.Open(##class(Semaphore.Counter).Name())
    s msg = "Consumer: Open Id = " _ cell.MyId()
    d ..Logger(ME, msg)

    // 以不同的数量和不同的时间反复递减信号量
    for deccnt = 1 : 1 : 15 {
        s decamt = $RANDOM(5) + 1
        s waitsec = $RANDOM(10) + 1
        s msg = "Decrement " _ cell.MyId() 
                _ " = " _ cell.GetValue()
                _ " by " _ decamt
                _ " wait " _ waitsec _ " sec"
        // 在这种情况下,我们等待一个信号量,但我们可以一次等待多个信号量减量(最多 200)
        d cell.AddToWaitMany(decamt)
        d ..Logger(ME, msg)
        s result = ##class(%SYSTEM.Semaphore).WaitMany(waitsec)
        s msg = $select((result > 0) : "授权", 1 : "超时")
        d ..Logger(ME, msg)

    }

    // 结束
    s msg = ME _ " 结束"
    d ..Logger(ME, msg)

    q $$$OK
}

/// 将消息传送到日志记录器
ClassMethod Logger(id As %String, msg As %String) [ Private ]
{
    d ##class(Semaphore.Util).Logger(id, msg)
    q
}

}

Class: Semaphore.Util

此类包含解决与此示例相关的两个问题的方法。第一个是保存记录消息所需的结构的初始化,以及归档提交到日志的消息及其后续显示的方法。

第二组方法处理生成编号序列的名称以识别生产者和消费者。这不是严格需要的,因为 $JOB 命令提供的进程 ID 也这样做,但使用更易于阅读的标签更容易。

Class Semaphore.Util Extends %RegisteredObject
{

/// 共享信号量的名称
Parameter ME = "Util";

/// 初始化输出日志
ClassMethod InitLog()
{
    // 初始化日志记录全局
    k ^SemaphoreLog
    s ^SemaphoreLog = 0
    q
}

/// 将收到的消息输入到全局以进行日志记录
/// 
ClassMethod Logger(sender As %String, msg As %String)
{
    s inx = $i(^SemaphoreLog)
    s ^SemaphoreLog(inx, 0) = $job
    s ^SemaphoreLog(inx, 1) = sender
    s ^SemaphoreLog(inx, 2) = msg
    w "(", ^SemaphoreLog, ") ", msg, !
    q
}

/// 显示日志中的消息
ClassMethod ShowLog()
{
    s msgcnt = $g(^SemaphoreLog, 0)
    w "消息日志:条目 = ", msgcnt, !, !
    w "#", ?5, "$Job", ?12, "Sender", ?25, "Message", !

    for i = 1 : 1 : msgcnt {
        s job = ^SemaphoreLog(i, 0)
        s sender = ^SemaphoreLog(i, 1)
        s msg = ^SemaphoreLog(i, 2)
        w i, ")", ?5, job, ?12, sender, ":", ?25, msg, !
    }
    q
}

/// 初始化名称索引
ClassMethod InitIndex()
{
    k ^SemaphoreNames
    q
}

/// 初始化名称索引
ClassMethod IndexName(name As %String) As %String
{
    if ($d(^SemaphoreNames(name)) = 0) {
        s ^SemaphoreNames(name) = 0
    }
    s index =  $i(^SemaphoreNames(name))
    q (name _ "." _ index)
}

}

讨论 (0)1
登录或注册以继续