package main
import (
“fmt”
“strings”
“sync”
“time”
“github.com/google/uuid”
)
func NewPubSub() *PubSubTrie {
return &PubSubTrie{}
}
type PubSubTrie struct {
children sync.Map
subscribers sync.Map
}
type SubscribeFunc func(msg interface{}) error
type Subscriber struct {
n *PubSubTrie
UUID string
fn SubscribeFunc
activeWithPrefix bool
}
func (s *Subscriber) Unsubscribe() {
s.n.subscribers.Delete(s.UUID)
}
func (t *PubSubTrie) getChild(word string) *PubSubTrie {
node := t
for _, ch := range word {
if ch == ‘*’ {
continue
}
node = node.child(ch)
}
return node
}
func (t *PubSubTrie) child(ch int32) *PubSubTrie {
trie := &PubSubTrie{}
if v, ok := t.children.LoadOrStore(ch, trie); ok {
trie = v.(*PubSubTrie)
}
return trie
}
func (t *PubSubTrie) Subscribe(word string, fn SubscribeFunc) Subscriber {
node := t.getChild(word)
s := Subscriber{
n: node,
UUID: uuid.New().String(),
fn: fn,
activeWithPrefix: false,
}
if strings.HasSuffix(word, “*”) {
s.activeWithPrefix = true
}
node.subscribers.Store(s.UUID, s)
return s
}
func (t *PubSubTrie) Publish(topic string, msg interface{}) {
node := t
length := len(topic)
for i, ch := range topic {
v, ok := node.children.Load(ch)
if !ok {
return
}
node, ok = v.(*PubSubTrie)
if !ok {
return
}
node.exec(msg, length == i+1)
}
}
func (t *PubSubTrie) exec(msg interface{}, isEnd bool) {
node := t
node.subscribers.Range(func(key, value interface{}) bool {
tfn := value.(Subscriber)
if isEnd || tfn.activeWithPrefix {
go func(tfn Subscriber) {
defer func() {
if r := recover(); r != nil {
fmt.Errorf(“pubsub func panic %s”, r)
}
}()
tfn.fn(msg)
}(tfn)
}
return true
})
}
func main() {
pubsub := NewPubSub()
pubsub.Subscribe(“abcd”, func(msg interface{}) error {
fmt.Printf(“%+v\n”, msg)
return nil
})
pubsub.Subscribe(“ab*”, func(msg interface{}) error {
fmt.Printf(“+ %+v\n”, msg)
return nil
})
for {
pubsub.Publish(“abcd”, “I am yaoyao”)
time.Sleep(1 * time.Second)
}
}
原文地址:http://www.cnblogs.com/Ziee/p/16911391.html
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,请务用于商业用途!
3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员!
8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载
声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性