728x90

TR;DL

Multi thread

점점 멀티쓰레드의 중요성이 강조되면서 여러개의 쓰레드에서 하나의 데이터에 접근할때  Safe Thread 적인 방식으로 접근하기 위해서

publiser - subscriber 패턴으로 접근하겠습니다.

 

 

Publisher

pubisher.go

package main

import "context"

type Publisher struct {
	ctx context.Context
	// subscribeCh Channels that only receive string
	subscribeCh chan chan<- string
	publishCh   chan string
	subscribers []chan<- string
}

func NewPublisher(ctx context.Context) *Publisher {
	return &Publisher{
		ctx:         ctx,
		subscribeCh: make(chan chan<- string),
		publishCh:   make(chan string),
		subscribers: make([]chan<- string, 0),
	}
}

func (p *Publisher) Subscribe(sub chan<- string) {
	p.subscribeCh <- sub
}

func (p *Publisher) Pulish(msg string) {
	p.publishCh <- msg
}

func (p *Publisher) Update() {
	for {
		select {
		case sub := <-p.subscribeCh:
			p.subscribers = append(p.subscribers, sub)
		case msg := <-p.publishCh:
			for _, subscriber := range p.subscribers {
				subscriber <- msg
			}
		case <-p.ctx.Done():
			wg.Done()
			return
		}
	}
}

 

Publisher-struct

하나씩 살펴보겠습니다.

subscribeCh chan chan<- string
publishCh   chan string
subscribers []chan<- string

구독자 채널을 받아서 단방향으로 string 타입만 채널로 전달 받습니다. chan 도 하나의 타입이기 때문에 chan chan<- string 이 가능합니다.

 

발행 채널과 구독자들의 이름을 담을 수 있는 채널을 만들었습니다.

 

Publisher-subscribe

func (p *Publisher) Subscribe(sub chan<- string) {
	p.subscribeCh <- sub
}

채널을 받아서 구독자 채널에 push 합니다. 이걸 어디에쓰냐 ?!

 

 

Publisher-Update

func (p *Publisher) Update() {
	for {
		select {
		case sub := <-p.subscribeCh:
			p.subscribers = append(p.subscribers, sub)
		case msg := <-p.publishCh:
			for _, subscriber := range p.subscribers {
				subscriber <- msg
			}
		case <-p.ctx.Done():
			wg.Done()
			return
		}
	}
}

업데이트 때 p.subscribeCh 에 데이터가 있으면 뽑아서 p.subscribers 배열에 추가합니다.

 

사실 Subscirbe receiver 에서 곧바로 배열에 추가해도 됩니다. 하지만 그렇게 되면 safe thread 하지 않아서 Lock을 잡아줘야합니다.

 

Publisher-subscribe (mutex lock, unlock)

func (p *Publisher) Subscribe(sub chan<- string) {
	mutex.Lock()
	// p.subscribeCh <- sub
    	p.subscribers = append(p.subscribers, sub)
    	mutex.Unlock()
}

 

Publisher-publish

 

func (p *Publisher) Publish(msg string) {
	p.publishCh <- msg
}

message가 들어오면 Update receiver 에서 구독자들을 순회하면서 메세지를 전송합니다.

 

 

Subscriber

subscriber.go

package main

import (
	"context"
	"fmt"
)

type Subscriber struct {
	ctx   context.Context
	name  string
	msgCh chan string
}

func NewSubsciber(name string, ctx context.Context) *Subscriber {
	return &Subscriber{
		ctx:   ctx,
		name:  name,
		msgCh: make(chan string),
	}
}

func (s *Subscriber) Subscribe(pub *Publisher) {
	pub.Subscribe(s.msgCh)
}

func (s *Subscriber) Update() {
	for {
		select {
		case msg := <-s.msgCh:
			fmt.Printf("%s got MEssage:%s\n", s.name, msg)
		case <-s.ctx.Done():
			wg.Done()
			return
		}
	}
}

 

Subscriber subscribe receiver

func (s *Subscriber) Subscribe(pub *Publisher) {
	pub.Subscribe(s.msgCh)
}

구독자는 구독을하면 publisher의 Subscribe 함수를 호출합니다.

 

 

Subscriber Update

func (s *Subscriber) Update() {
	for {
		select {
		case msg := <-s.msgCh:
			fmt.Printf("%s got MEssage:%s\n", s.name, msg)
		case <-s.ctx.Done():
			wg.Done()
			return
		}
	}
}

 

구독자의 Update receiver 는 대기하고 있다가 Publisher 가 data를 뿌리면 data를 받아서 출력합니다.

 

 

Main

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	wg.Add(4)
	publisher := NewPublisher(ctx)
	subscriber1 := NewSubsciber("AAA", ctx)
	subscriber2 := NewSubsciber("BBB", ctx)

	go publisher.Update()

	subscriber1.Subscribe(publisher)
	subscriber2.Subscribe(publisher)

	go subscriber1.Update()
	go subscriber2.Update()

	go func() {
		ticker := time.NewTicker(2 * time.Second)
		for {
			select {
			case <-ticker.C:
				publisher.Publish("tick tick")
			case <-ctx.Done():
				wg.Done()
				return
			}
		}
	}()

	fmt.Scanln()
	cancel()
	wg.Wait()
}

 

먼저 취소가능한 Context를 만듭니다.

Publisher를 만들고 구독자 AAA 와 BBB 두명을 만들었습니다.

 

Update function들을 go 루틴으로 각각호출합니다.

구독자 각각 publisher를 구독합니다.

 

ticker를 사용해 2초마다 메세지를 전달합니다.

입력이 들어올때까지 무한대기하다가 입력이 들어오면 cancel() 하고 waitgroup을 종료합니다.

 

 

728x90