Publisher Subscriber pattern (발행 구독 패턴)
TR;DL
Multi thread
점점 멀티쓰레드의 중요성이 강조되면서 여러개의 쓰레드에서 하나의 데이터에 접근할때 Safe Thread 적인 방식으로 접근하기 위해서
publiser - subscriber 패턴으로 접근하겠습니다.
Publisher
pubisher.go
package main
import "context"
type Publisher struct {
ctx context.Context
// subscribeCh Channels that only receive string
subscribeCh chan chan<- string
publishCh chan string
subscribers []chan<- string
}
func NewPublisher(ctx context.Context) *Publisher {
return &Publisher{
ctx: ctx,
subscribeCh: make(chan chan<- string),
publishCh: make(chan string),
subscribers: make([]chan<- string, 0),
}
}
func (p *Publisher) Subscribe(sub chan<- string) {
p.subscribeCh <- sub
}
func (p *Publisher) Pulish(msg string) {
p.publishCh <- msg
}
func (p *Publisher) Update() {
for {
select {
case sub := <-p.subscribeCh:
p.subscribers = append(p.subscribers, sub)
case msg := <-p.publishCh:
for _, subscriber := range p.subscribers {
subscriber <- msg
}
case <-p.ctx.Done():
wg.Done()
return
}
}
}
Publisher-struct
하나씩 살펴보겠습니다.
subscribeCh chan chan<- string
publishCh chan string
subscribers []chan<- string
구독자 채널을 받아서 단방향으로 string 타입만 채널로 전달 받습니다. chan 도 하나의 타입이기 때문에 chan chan<- string 이 가능합니다.
발행 채널과 구독자들의 이름을 담을 수 있는 채널을 만들었습니다.
Publisher-subscribe
func (p *Publisher) Subscribe(sub chan<- string) {
p.subscribeCh <- sub
}
채널을 받아서 구독자 채널에 push 합니다. 이걸 어디에쓰냐 ?!
Publisher-Update
func (p *Publisher) Update() {
for {
select {
case sub := <-p.subscribeCh:
p.subscribers = append(p.subscribers, sub)
case msg := <-p.publishCh:
for _, subscriber := range p.subscribers {
subscriber <- msg
}
case <-p.ctx.Done():
wg.Done()
return
}
}
}
업데이트 때 p.subscribeCh 에 데이터가 있으면 뽑아서 p.subscribers 배열에 추가합니다.
사실 Subscirbe receiver 에서 곧바로 배열에 추가해도 됩니다. 하지만 그렇게 되면 safe thread 하지 않아서 Lock을 잡아줘야합니다.
Publisher-subscribe (mutex lock, unlock)
func (p *Publisher) Subscribe(sub chan<- string) {
mutex.Lock()
// p.subscribeCh <- sub
p.subscribers = append(p.subscribers, sub)
mutex.Unlock()
}
Publisher-publish
func (p *Publisher) Publish(msg string) {
p.publishCh <- msg
}
message가 들어오면 Update receiver 에서 구독자들을 순회하면서 메세지를 전송합니다.
Subscriber
subscriber.go
package main
import (
"context"
"fmt"
)
type Subscriber struct {
ctx context.Context
name string
msgCh chan string
}
func NewSubsciber(name string, ctx context.Context) *Subscriber {
return &Subscriber{
ctx: ctx,
name: name,
msgCh: make(chan string),
}
}
func (s *Subscriber) Subscribe(pub *Publisher) {
pub.Subscribe(s.msgCh)
}
func (s *Subscriber) Update() {
for {
select {
case msg := <-s.msgCh:
fmt.Printf("%s got MEssage:%s\n", s.name, msg)
case <-s.ctx.Done():
wg.Done()
return
}
}
}
Subscriber subscribe receiver
func (s *Subscriber) Subscribe(pub *Publisher) {
pub.Subscribe(s.msgCh)
}
구독자는 구독을하면 publisher의 Subscribe 함수를 호출합니다.
Subscriber Update
func (s *Subscriber) Update() {
for {
select {
case msg := <-s.msgCh:
fmt.Printf("%s got MEssage:%s\n", s.name, msg)
case <-s.ctx.Done():
wg.Done()
return
}
}
}
구독자의 Update receiver 는 대기하고 있다가 Publisher 가 data를 뿌리면 data를 받아서 출력합니다.
Main
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(4)
publisher := NewPublisher(ctx)
subscriber1 := NewSubsciber("AAA", ctx)
subscriber2 := NewSubsciber("BBB", ctx)
go publisher.Update()
subscriber1.Subscribe(publisher)
subscriber2.Subscribe(publisher)
go subscriber1.Update()
go subscriber2.Update()
go func() {
ticker := time.NewTicker(2 * time.Second)
for {
select {
case <-ticker.C:
publisher.Publish("tick tick")
case <-ctx.Done():
wg.Done()
return
}
}
}()
fmt.Scanln()
cancel()
wg.Wait()
}
먼저 취소가능한 Context를 만듭니다.
Publisher를 만들고 구독자 AAA 와 BBB 두명을 만들었습니다.
Update function들을 go 루틴으로 각각호출합니다.
구독자 각각 publisher를 구독합니다.
ticker를 사용해 2초마다 메세지를 전달합니다.
입력이 들어올때까지 무한대기하다가 입력이 들어오면 cancel() 하고 waitgroup을 종료합니다.
'Go' 카테고리의 다른 글
Goroutine 정말 수백만개를 만들어도 문제 없을까? (0) | 2021.10.29 |
---|---|
Fibonacci 로 알아보는 동시성 과 클로저 (0) | 2021.10.26 |
Handling databases using gorm (0) | 2021.10.20 |
Go interface: 대소문자 의 의미 (0) | 2021.10.20 |
Docker 로 Go 프로젝트 빌드 & 실행하기 (0) | 2021.10.05 |
댓글
이 글 공유하기
다른 글
-
Goroutine 정말 수백만개를 만들어도 문제 없을까?
Goroutine 정말 수백만개를 만들어도 문제 없을까?
2021.10.29 -
Fibonacci 로 알아보는 동시성 과 클로저
Fibonacci 로 알아보는 동시성 과 클로저
2021.10.26 -
Handling databases using gorm
Handling databases using gorm
2021.10.20 -
Go interface: 대소문자 의 의미
Go interface: 대소문자 의 의미
2021.10.20