a41f938cec
As we have seen, dependencies version can prevent the build. We should user lock versions on dependencies that we know work: * Packages are vendored * Add Godep support * Added addtional install step in readme * Fix travis build error
117 lignes
1,8 Kio
Go
117 lignes
1,8 Kio
Go
package pubsub
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
type PubSub struct {
|
|
mu sync.Mutex
|
|
next chan item
|
|
closed bool
|
|
}
|
|
|
|
type item struct {
|
|
value interface{}
|
|
next chan item
|
|
}
|
|
|
|
type Subscription struct {
|
|
next chan item
|
|
Values chan interface{}
|
|
mu sync.Mutex
|
|
closed chan struct{}
|
|
}
|
|
|
|
func NewPubSub() (ret *PubSub) {
|
|
return new(PubSub)
|
|
}
|
|
|
|
func (me *PubSub) init() {
|
|
me.next = make(chan item, 1)
|
|
}
|
|
|
|
func (me *PubSub) lazyInit() {
|
|
me.mu.Lock()
|
|
defer me.mu.Unlock()
|
|
if me.closed {
|
|
return
|
|
}
|
|
if me.next == nil {
|
|
me.init()
|
|
}
|
|
}
|
|
|
|
func (me *PubSub) Publish(v interface{}) {
|
|
me.lazyInit()
|
|
next := make(chan item, 1)
|
|
i := item{v, next}
|
|
me.mu.Lock()
|
|
if !me.closed {
|
|
me.next <- i
|
|
me.next = next
|
|
}
|
|
me.mu.Unlock()
|
|
}
|
|
|
|
func (me *Subscription) Close() {
|
|
me.mu.Lock()
|
|
defer me.mu.Unlock()
|
|
select {
|
|
case <-me.closed:
|
|
default:
|
|
close(me.closed)
|
|
}
|
|
}
|
|
|
|
func (me *Subscription) runner() {
|
|
defer close(me.Values)
|
|
for {
|
|
select {
|
|
case i, ok := <-me.next:
|
|
if !ok {
|
|
me.Close()
|
|
return
|
|
}
|
|
// Send the value back into the channel for someone else. This
|
|
// won't block because the channel has a capacity of 1, and this
|
|
// is currently the only copy of this value being sent to this
|
|
// channel.
|
|
me.next <- i
|
|
// The next value comes from the channel given to us by the value
|
|
// we just got.
|
|
me.next = i.next
|
|
select {
|
|
case me.Values <- i.value:
|
|
case <-me.closed:
|
|
return
|
|
}
|
|
case <-me.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (me *PubSub) Subscribe() (ret *Subscription) {
|
|
me.lazyInit()
|
|
ret = &Subscription{
|
|
closed: make(chan struct{}),
|
|
Values: make(chan interface{}),
|
|
}
|
|
me.mu.Lock()
|
|
ret.next = me.next
|
|
me.mu.Unlock()
|
|
go ret.runner()
|
|
return
|
|
}
|
|
|
|
func (me *PubSub) Close() {
|
|
me.mu.Lock()
|
|
defer me.mu.Unlock()
|
|
if me.closed {
|
|
return
|
|
}
|
|
if me.next != nil {
|
|
close(me.next)
|
|
}
|
|
me.closed = true
|
|
}
|