其他分享
首页 > 其他分享> > Golang 实现nsq

Golang 实现nsq

作者:互联网

//生产者

package main

import (
	"bufio"
	"fmt"
	"github.com/nsqio/go-nsq"
	"os"
	"strings"
)
var producer *nsq.Producer


//初始化生产者

func initProducer(str string) (err error){
	config := nsq.NewConfig()
	producer ,err = nsq.NewProducer(str,config)
	if err !=nil{
		fmt.Printf("create producer failed err :%v\n",err)
		return err
	}
	return nil
}

func main() {
	nsqAddress :="127.0.0.1:4150"
	err := initProducer(nsqAddress)
	if err !=nil{
		fmt.Printf("init producer failed err :%v\n",err)
		return
	}
	fmt.Println("init producer success...")

	//从标准输入获取数据
	reader := bufio.NewReader(os.Stdin)
	for{
		data,err := reader.ReadString('\n')
		if err !=nil{
			fmt.Printf("read string from stdin failed err:%v\n",err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q"{
			break
		}
		err = producer.Publish("topic_demo1",[]byte(data))
		if err !=nil{
			fmt.Printf("publish msg to nsq failed err :%v\n",err)
			continue
		}
	}
}

  

//消费者

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"os"
	"os/signal"
	"syscall"
	"time"
)

//消费者
type MyHandler struct {
	msg chan string
}

func (m *MyHandler) HandleMessage(msg *nsq.Message) error {
	m.msg <- string(msg.Body)
	return nil
}

//初始化
func initConsumer(topic string,channel string,address string,ch chan string) (err error) {
	config := nsq.NewConfig()
	config.LookupdPollInterval = 15*time.Second
	consumer ,err := nsq.NewConsumer(topic,channel,config)
	if err !=nil{
		fmt.Printf("create consumer failed err :%v\n",err)
		return err
	}
	consumer.AddHandler(&MyHandler{msg: ch})
	//连接lookupd 查询
	if err := consumer.ConnectToNSQLookupd(address);err !=nil{
		return err
	}
	return nil
}

func main() {
	ch := make(chan string,10)
	err := initConsumer("topic_demo1","first","127.0.0.1:4151",ch)
	if err !=nil{
		fmt.Printf("init consumer failed err :%v\n",err)
		return
	}
	fmt.Println("init consumer success...")

	for{
		tmp,ok := <-ch
		if !ok{
			break
		}
		fmt.Println(tmp)
	}

	c := make(chan os.Signal)
	signal.Notify(c,syscall.SIGINT)
	<-c
}

  

标签:err,producer,实现,fmt,Golang,nil,nsq,data
来源: https://www.cnblogs.com/pebblecome/p/14312345.html