Golang 实现nsq
作者:互联网
//生产者
package main import ( "bufio" "fmt" "github.com/nsqio/go-nsq" "os" "strings" ) var producer *nsq.Producer //初始化生产者 func initProducer(str string) (err error){ config := nsq.NewConfig() producer ,err = nsq.NewProducer(str,config) if err !=nil{ fmt.Printf("create producer failed err :%v\n",err) return err } return nil } func main() { nsqAddress :="127.0.0.1:4150" err := initProducer(nsqAddress) if err !=nil{ fmt.Printf("init producer failed err :%v\n",err) return } fmt.Println("init producer success...") //从标准输入获取数据 reader := bufio.NewReader(os.Stdin) for{ data,err := reader.ReadString('\n') if err !=nil{ fmt.Printf("read string from stdin failed err:%v\n",err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q"{ break } err = producer.Publish("topic_demo1",[]byte(data)) if err !=nil{ fmt.Printf("publish msg to nsq failed err :%v\n",err) continue } } }
//消费者
package main import ( "fmt" "github.com/nsqio/go-nsq" "os" "os/signal" "syscall" "time" ) //消费者 type MyHandler struct { msg chan string } func (m *MyHandler) HandleMessage(msg *nsq.Message) error { m.msg <- string(msg.Body) return nil } //初始化 func initConsumer(topic string,channel string,address string,ch chan string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15*time.Second consumer ,err := nsq.NewConsumer(topic,channel,config) if err !=nil{ fmt.Printf("create consumer failed err :%v\n",err) return err } consumer.AddHandler(&MyHandler{msg: ch}) //连接lookupd 查询 if err := consumer.ConnectToNSQLookupd(address);err !=nil{ return err } return nil } func main() { ch := make(chan string,10) err := initConsumer("topic_demo1","first","127.0.0.1:4151",ch) if err !=nil{ fmt.Printf("init consumer failed err :%v\n",err) return } fmt.Println("init consumer success...") for{ tmp,ok := <-ch if !ok{ break } fmt.Println(tmp) } c := make(chan os.Signal) signal.Notify(c,syscall.SIGINT) <-c }
标签:err,producer,实现,fmt,Golang,nil,nsq,data 来源: https://www.cnblogs.com/pebblecome/p/14312345.html