NSQ中有topic和channel的概念,生产者生产topic,消费者消费的时候,两个消费者连接同一个topic和channel只有一个消费者会获得物料,同一个topic不同的channel两个消费者都会获得物料,NSQ通过这样的机制可以实现多播和分发。
要安装github.com/bitly/go-nsq
这个包。
可以用命令go get github.com/bitly/go-nsq
来安装。
建立nsq.go,直接上代码:
package main
import (
"fmt"
"github.com/bitly/go-nsq"
"strconv"
"time"
)
func main() {
producer()
go consumer() // simulate two consumer
consumer2()
}
func producer() {
producer, err := nsq.NewProducer("192.168.33.10:4150", nsq.NewConfig())
defer producer.Stop()
if err != nil {
fmt.Println(err.Error())
}
for i := 0; i < 10000; i++ {
// create a topic named testTopic
err = producer.Publish("testTopic", []byte("testing...."+strconv.Itoa(i)))
if err != nil {
fmt.Println(err.Error())
}
}
}
func consumer() {
// creat a consumer with channel 'channelTestOne'
consumer, err := nsq.NewConsumer("testTopic", "channelTestOne", nsq.NewConfig())
if err != nil {
fmt.Println(err.Error())
}
handler := new(NSQMessageHandler)
handler.msgchan = make(chan *nsq.Message, 1024)
consumer.AddHandler(nsq.HandlerFunc(handler.HandleMessage))
err = consumer.ConnectToNSQLookupd("192.168.33.10:4161")
if err != nil {
fmt.Println(err.Error())
}
handler.Process()
}
func consumer2() {
// creat another consumer with channel 'channelTestTwo'
consumer, err := nsq.NewConsumer("testTopic", "channelTestTwo", nsq.NewConfig())
if err != nil {
fmt.Println(err.Error())
}
handler := new(NSQMessageHandler)
handler.msgchan = make(chan *nsq.Message, 1024)
consumer.AddHandler(nsq.HandlerFunc(handler.HandleMessage))
err = consumer.ConnectToNSQLookupd("192.168.33.10:4161")
if err != nil {
fmt.Println(err.Error())
}
handler.Process()
}
type NSQMessageHandler struct {
msgchan chan *nsq.Message
stop bool
}
func (m *NSQMessageHandler) HandleMessage(message *nsq.Message) error {
if !m.stop {
m.msgchan <- message
}
return nil
}
func (m *NSQMessageHandler) Process() {
m.stop = false
for {
select {
case message := <-m.msgchan:
fmt.Println(string(message.Body))
case <-time.After(time.Second):
if m.stop {
close(m.msgchan)
return
}
}
}
}
搜索
标签
study
ab
amap
apache
apahe
awk
aws
bat
centos
CFS
chrome
cmd
cnpm
composer
consul
crontab
css
curl
cygwin
devops
di
docker
docker,docker-compose
ethereum
excel
fiddler
fluentd
framework
front-end
git
gitgui
github
glide
go
golang
gorm
grafana
gzip
ioc
item2
iterm2
javascript
jenkins
jsonp
kafka
laradock
laravel
larval
linux
liunux
log
mac
mac, wi-fi
macos
magento
mariaDB
minikube
mongoDB
msp
mysql
netbeans
nginx
nodejs
nohup
npm
nsq
php
php-fpm
php7
phpstorm
php扩展
Protobuf
python
redis
scp
server
shell
soap
socket
socket5
sql
sre
ssdb
ssh
ssl
study
sublime
swift
system
td-agent
uml
v2ray
vagrant
vagrnat
vim
vpn
vue
vue.js
webpack
webrtc
websocket
webtatic
windows
windows7
word
wps
xdebug
yarn
yii2
yum
zookeeper
世界国家
互联网
以太坊
分类
前端
小程序
打印机
排序算法
搞笑
权限
粤语
缓存
网络
虚拟机
视频
设计模式
项目管理
热门文章
友情链接