package main

import (
    “fmt”
    “strings”
    “sync”
    “time”

    “github.com/google/uuid”
)

func NewPubSub() *PubSubTrie {
    return &PubSubTrie{}
}

type PubSubTrie struct {
    children    sync.Map
    subscribers sync.Map
}

type SubscribeFunc func(msg interface{}) error

type Subscriber struct {
    n                *PubSubTrie
    UUID             string
    fn               SubscribeFunc
    activeWithPrefix bool
}

func (s *Subscriber) Unsubscribe() {
    s.n.subscribers.Delete(s.UUID)
}

func (t *PubSubTrie) getChild(word string) *PubSubTrie {
    node := t
    for _, ch := range word {
        if ch == ‘*’ {
            continue
        }
        node = node.child(ch)
    }
    return node
}

func (t *PubSubTrie) child(ch int32) *PubSubTrie {
    trie := &PubSubTrie{}
    if v, ok := t.children.LoadOrStore(ch, trie); ok {
        trie = v.(*PubSubTrie)
    }
    return trie
}

func (t *PubSubTrie) Subscribe(word string, fn SubscribeFunc) Subscriber {
    node := t.getChild(word)
    s := Subscriber{
        n:                node,
        UUID:             uuid.New().String(),
        fn:               fn,
        activeWithPrefix: false,
    }
    if strings.HasSuffix(word, “*”) {
        s.activeWithPrefix = true
    }
    node.subscribers.Store(s.UUID, s)
    return s
}

func (t *PubSubTrie) Publish(topic string, msg interface{}) {
    node := t
    length := len(topic)
    for i, ch := range topic {
        v, ok := node.children.Load(ch)
        if !ok {
            return
        }
        node, ok = v.(*PubSubTrie)
        if !ok {
            return
        }
        node.exec(msg, length == i+1)
    }
}

func (t *PubSubTrie) exec(msg interface{}, isEnd bool) {
    node := t
    node.subscribers.Range(func(key, value interface{}) bool {
        tfn := value.(Subscriber)
        if isEnd || tfn.activeWithPrefix {
            go func(tfn Subscriber) {
                defer func() {
                    if r := recover(); r != nil {
                        fmt.Errorf(“pubsub func panic %s”, r)
                    }
                }()
                tfn.fn(msg)
            }(tfn)
        }
        return true
    })
}

func main() {
    pubsub := NewPubSub()

    pubsub.Subscribe(“abcd”, func(msg interface{}) error {
        fmt.Printf(“%+v\n”, msg)
        return nil
    })

    pubsub.Subscribe(“ab*”, func(msg interface{}) error {
        fmt.Printf(“+ %+v\n”, msg)
        return nil
    })

    for {
        pubsub.Publish(“abcd”, “I am yaoyao”)
        time.Sleep(1 * time.Second)
    }

}

原文地址:http://www.cnblogs.com/Ziee/p/16911391.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性