NSQ中有topic和channel的概念,生产者生产topic,消费者消费的时候,两个消费者连接同一个topic和channel只有一个消费者会获得物料,同一个topic不同的channel两个消费者都会获得物料,NSQ通过这样的机制可以实现多播和分发。
要安装github.com/bitly/go-nsq这个包。
可以用命令go get github.com/bitly/go-nsq来安装。
建立nsq.go,直接上代码:

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/bitly/go-nsq"
  5. "strconv"
  6. "time"
  7. )
  8. func main() {
  9. producer()
  10. go consumer() // simulate two consumer
  11. consumer2()
  12. }
  13. func producer() {
  14. producer, err := nsq.NewProducer("192.168.33.10:4150", nsq.NewConfig())
  15. defer producer.Stop()
  16. if err != nil {
  17. fmt.Println(err.Error())
  18. }
  19. for i := 0; i < 10000; i++ {
  20. // create a topic named testTopic
  21. err = producer.Publish("testTopic", []byte("testing...."+strconv.Itoa(i)))
  22. if err != nil {
  23. fmt.Println(err.Error())
  24. }
  25. }
  26. }
  27. func consumer() {
  28. // creat a consumer with channel 'channelTestOne'
  29. consumer, err := nsq.NewConsumer("testTopic", "channelTestOne", nsq.NewConfig())
  30. if err != nil {
  31. fmt.Println(err.Error())
  32. }
  33. handler := new(NSQMessageHandler)
  34. handler.msgchan = make(chan *nsq.Message, 1024)
  35. consumer.AddHandler(nsq.HandlerFunc(handler.HandleMessage))
  36. err = consumer.ConnectToNSQLookupd("192.168.33.10:4161")
  37. if err != nil {
  38. fmt.Println(err.Error())
  39. }
  40. handler.Process()
  41. }
  42. func consumer2() {
  43. // creat another consumer with channel 'channelTestTwo'
  44. consumer, err := nsq.NewConsumer("testTopic", "channelTestTwo", nsq.NewConfig())
  45. if err != nil {
  46. fmt.Println(err.Error())
  47. }
  48. handler := new(NSQMessageHandler)
  49. handler.msgchan = make(chan *nsq.Message, 1024)
  50. consumer.AddHandler(nsq.HandlerFunc(handler.HandleMessage))
  51. err = consumer.ConnectToNSQLookupd("192.168.33.10:4161")
  52. if err != nil {
  53. fmt.Println(err.Error())
  54. }
  55. handler.Process()
  56. }
  57. type NSQMessageHandler struct {
  58. msgchan chan *nsq.Message
  59. stop bool
  60. }
  61. func (m *NSQMessageHandler) HandleMessage(message *nsq.Message) error {
  62. if !m.stop {
  63. m.msgchan <- message
  64. }
  65. return nil
  66. }
  67. func (m *NSQMessageHandler) Process() {
  68. m.stop = false
  69. for {
  70. select {
  71. case message := <-m.msgchan:
  72. fmt.Println(string(message.Body))
  73. case <-time.After(time.Second):
  74. if m.stop {
  75. close(m.msgchan)
  76. return
  77. }
  78. }
  79. }
  80. }

分类: web

标签:   nsq   golang