RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
而在AMQP中主要有两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:
- Broker:简单来说就是消息队列服务器实体
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
- 可靠性:包括消息持久化,消费者和生产者的消息确认
- 灵活路由:遵循AMQP协议,支持多种Exchange类型实现不同路由策略
- 分布式:集群的支持,包括本地网络与远程网络
- 高可用性:支持主从备份与镜像队列
- 多语言支持:支持多语言的客户端
- WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
- 访问控制:基于vhosts实现访问控制
- 调试追踪:支持tracing,方便调试
lion@ubuntu1404:~$ sudo echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
lion@ubuntu1404:~$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
lion@ubuntu1404:~$ sudo apt-get update
lion@ubuntu1404:~$ sudo apt-get install rabbitmq-server
lion@node1:~$ sudo apt-get install -y erlang-nox erlang-dev erlang-src
4.2.2、Rabbitmq 3.6.3安装
lion@node1:~$ mkdir -p _app
lion@node1:~/_app$ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.3/rabbitmq-server-generic-unix-3.6.3.tar.xz
lion@node1:~/_app$ xz -d rabbitmq-server-generic-unix-3.6.3.tar.xz
lion@node1:~/_app$ tar -xvf rabbitmq-server-generic-unix-3.6.3.tar
lion@node1:~/_app$ cd rabbitmq_server-3.6.3
lion@node1:~$ vi .bashrc
export RABBITMQ_HOME="/home/lion/_app/rabbitmq_server-3.6.3"
lion@node1:~$ source .bashrc
lion@node1:~$ rabbitmq-server
lion@node1:~$ rabbitmqctl stop
lion@node1:~$ rabbitmqctl start
lion@node1:~$ rabbitmqctl add_user lion 123456
lion@node1:~$ rabbitmqctl list_users
Listing users ...
guest [administrator]
lion []
这个时候lion用户是不能访问web管理插件的,需要配置用户角色,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。
- 超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
- 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。
- 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
- 其他
lion@node1:~$ rabbitmqctl set_user_tags lion administrator
lion@node1:~$ rabbitmq-plugins enable rabbitmq_management (启用插件)
lion@node1:~$ rabbitmq-plugins disable rabbitmq_management (禁用插件)
4.4、RabbitMQ 的配置文件介绍
%% -*- mode: erlang -*-
%% ----------------------------------------------------------------------------
%% RabbitMQ Sample Configuration File.
%% See http://www.rabbitmq.com/configure.html for details.
%% ----------------------------------------------------------------------------
%% Network Connectivity
%% ====================
%% By default, RabbitMQ will listen on all interfaces, using
%% the standard (reserved) AMQP port.
%% 默认的监听端口
%% {tcp_listeners, [5672]},
%% To listen on a specific interface, provide a tuple of {IpAddress, Port}.
%% For example, to listen only on localhost for both IPv4 and IPv6:
%% 也可以使用下面的格式进行指定IP和端口的监听
%% {tcp_listeners, [{"", 5672},
%% {"::1", 5672}]},
%% SSL listeners are configured in the same fashion as TCP listeners,
%% including the option to control the choice of interface.
%% SSL连接端口配置
%% {ssl_listeners, [5671]},
%% Number of Erlang processes that will accept connections for the TCP
%% and SSL listeners.
%% TCP连接的进程数
%% {num_tcp_acceptors, 10},
%% {num_ssl_acceptors, 1},
%% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
%% and SSL handshake), in milliseconds.
%% 超时时间,单位毫秒
%% {handshake_timeout, 10000},
%% Log levels (currently just used for connection logging).
%% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing
%% order of verbosity. Defaults to 'info'.
%% 日志的级别,默认是info
%% {log_levels, [{connection, info}, {channel, info}]},
%% Set to 'true' to perform reverse DNS lookups when accepting a
%% connection. Hostnames will then be shown instead of IP addresses
%% in rabbitmqctl and the management plugin.
%% {reverse_dns_lookups, true},
%% Security / AAA
%% ==============
%% 安全配置
%% The default "guest" user is only permitted to access the server
%% via a loopback interface (e.g. localhost).
%% {loopback_users, [<<"guest">>]},
%% Uncomment the following line if you want to allow access to the
%% guest user from anywhere on the network.
%% {loopback_users, []},
%% Configuring SSL.
%% See http://www.rabbitmq.com/ssl.html for full documentation.
%% {ssl_options, [{cacertfile, "/path/to/testca/cacert.pem"},
%% {certfile, "/path/to/server/cert.pem"},
%% {keyfile, "/path/to/server/key.pem"},
%% {verify, verify_peer},
%% {fail_if_no_peer_cert, false}]},
%% Choose the available SASL mechanism(s) to expose.
%% The two default (built in) mechanisms are 'PLAIN' and
%% 'AMQPLAIN'. Additional mechanisms can be added via
%% plugins.
%% See http://www.rabbitmq.com/authentication.html for more details.
%% {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
%% Select an authentication database to use. RabbitMQ comes bundled
%% with a built-in auth-database, based on mnesia.
%% {auth_backends, [rabbit_auth_backend_internal]},
%% Configurations supporting the rabbitmq_auth_mechanism_ssl and
%% rabbitmq_auth_backend_ldap plugins.
%% NB: These options require that the relevant plugin is enabled.
%% See http://www.rabbitmq.com/plugins.html for further details.
%% The RabbitMQ-auth-mechanism-ssl plugin makes it possible to
%% authenticate a user based on the client's SSL certificate.
%% To use auth-mechanism-ssl, add to or replace the auth_mechanisms
%% list with the entry 'EXTERNAL'.
%% {auth_mechanisms, ['EXTERNAL']},
%% The rabbitmq_auth_backend_ldap plugin allows the broker to
%% perform authentication and authorisation by deferring to an
%% external LDAP server.
%% For more information about configuring the LDAP backend, see
%% http://www.rabbitmq.com/ldap.html.
%% Enable the LDAP auth backend by adding to or replacing the
%% auth_backends entry:
%% {auth_backends, [rabbit_auth_backend_ldap]},
%% This pertains to both the rabbitmq_auth_mechanism_ssl plugin and
%% STOMP ssl_cert_login configurations. See the rabbitmq_stomp
%% configuration section later in this file and the README in
%% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further
%% details.
%% To use the SSL cert's CN instead of its DN as the username
%% {ssl_cert_login_from, common_name},
%% SSL handshake timeout, in milliseconds.
%% {ssl_handshake_timeout, 5000},
%% Password hashing implementation. Will only affect newly
%% created users. To recalculate hash for an existing user
%% it's necessary to update her password.
%% {password_hashing_module, rabbit_password_hashing_sha256},
%% Default User / VHost
%% ====================
%% 用户访问设置
%% On first start RabbitMQ will create a vhost and a user. These
%% config items control what gets created. See
%% http://www.rabbitmq.com/access-control.html for further
%% information about vhosts and access control.
%% {default_vhost, <<"/">>},
%% {default_user, <<"guest">>},
%% {default_pass, <<"guest">>},
%% {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
%% Tags for default user
%% For more details about tags, see the documentation for the
%% Management Plugin at http://www.rabbitmq.com/management.html.
%% {default_user_tags, [administrator]},
%% Additional network and protocol related configuration
%% =====================================================
%% Set the default AMQP heartbeat delay (in seconds).
%% 设置默认AMQP心跳延迟(秒)
%% {heartbeat, 600},
%% Set the max permissible size of an AMQP frame (in bytes).
%% {frame_max, 131072},
%% Set the max frame size the server will accept before connection
%% tuning occurs
%% {initial_frame_max, 4096},
%% Set the max permissible number of channels per connection.
%% 0 means "no limit".
%% {channel_max, 128},
%% Customising Socket Options.
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
%% further documentation.
%% {tcp_listen_options, [{backlog, 128},
%% {nodelay, true},
%% {exit_on_close, false}]},
%% Resource Limits & Flow Control
%% ==============================
%% See http://www.rabbitmq.com/memory.html for full details.
%% Memory-based Flow Control threshold.
%% {vm_memory_high_watermark, 0.4},
%% Alternatively, we can set a limit (in bytes) of RAM used by the node.
%% {vm_memory_high_watermark, {absolute, 1073741824}},
%% Or you can set absolute value using memory units.
%% {vm_memory_high_watermark, {absolute, "1024M"}},
%% Supported units suffixes:
%% k, kiB: kibibytes (2^10 bytes)
%% M, MiB: mebibytes (2^20)
%% G, GiB: gibibytes (2^30)
%% kB: kilobytes (10^3)
%% MB: megabytes (10^6)
%% GB: gigabytes (10^9)
%% Fraction of the high watermark limit at which queues start to
%% page message out to disc in order to free up memory.
%% Values greater than 0.9 can be dangerous and should be used carefully.
%% 内存最大使用比例
%% {vm_memory_high_watermark_paging_ratio, 0.5},
%% Interval (in milliseconds) at which we perform the check of the memory
%% levels against the watermarks.
%% 检查内存的间隔(毫秒)
%% {memory_monitor_interval, 2500},
%% Set disk free limit (in bytes). Once free disk space reaches this
%% lower bound, a disk alarm will be set - see the documentation
%% listed above for more details.
%% {disk_free_limit, 50000000},
%% Or you can set it using memory units (same as in vm_memory_high_watermark)
%% {disk_free_limit, "50MB"},
%% {disk_free_limit, "50000kB"},
%% {disk_free_limit, "2GB"},
%% Alternatively, we can set a limit relative to total available RAM.
%% Values lower than 1.0 can be dangerous and should be used carefully.
%% {disk_free_limit, {mem_relative, 2.0}},
%% Misc/Advanced Options
%% =====================
%% NB: Change these only if you understand what you are doing!
%% To announce custom properties to clients on connection:
%% {server_properties, []},
%% How to respond to cluster partitions.
%% See http://www.rabbitmq.com/partitions.html for further details.
%% {cluster_partition_handling, ignore},
%% Make clustering happen *automatically* at startup - only applied
%% to nodes that have just been reset or started for the first time.
%% See http://www.rabbitmq.com/clustering.html#auto-config for
%% further details.
%% 设置集群启动的节点
%% {cluster_nodes, {['rabbit@my.host.com'], disc}},
%% Interval (in milliseconds) at which we send keepalive messages
%% to other cluster members. Note that this is not the same thing
%% as net_ticktime; missed keepalive messages will not cause nodes
%% to be considered down.
%% 集群消息同步的时间(毫秒)
%% {cluster_keepalive_interval, 10000},
%% Set (internal) statistics collection granularity.
%% {collect_statistics, none},
%% Statistics collection interval (in milliseconds).
%% {collect_statistics_interval, 5000},
%% Explicitly enable/disable hipe compilation.
%% {hipe_compile, true},
%% Timeout used when waiting for Mnesia tables in a cluster to
%% become available.
%% {mnesia_table_loading_timeout, 30000},
%% Size in bytes below which to embed messages in the queue index. See
%% http://www.rabbitmq.com/persistence-conf.html
%% {queue_index_embed_msgs_below, 4096}
%% ----------------------------------------------------------------------------
%% Advanced Erlang Networking/Clustering Options.
%% See http://www.rabbitmq.com/clustering.html for details
%% ----------------------------------------------------------------------------
[%% Sets the net_kernel tick time.
%% Please see http://erlang.org/doc/man/kernel_app.html and
%% http://www.rabbitmq.com/nettick.html for further details.
%% {net_ticktime, 60}
%% ----------------------------------------------------------------------------
%% RabbitMQ Management Plugin
%% See http://www.rabbitmq.com/management.html for details
%% ----------------------------------------------------------------------------
[%% Pre-Load schema definitions from the following JSON file. See
%% http://www.rabbitmq.com/management.html#load-definitions
%% {load_definitions, "/path/to/schema.json"},
%% Log all requests to the management HTTP API to a file.
%% 所有请求的HTTP API文件日志的路径。
%% {http_log_dir, "/path/to/access.log"},
%% Change the port on which the HTTP listener listens,
%% specifying an interface for the web server to bind to.
%% Also set the listener to use SSL and provide SSL options.
%% Web管理的地址和端口
%% {listener, [{port, 12345},
%% {ip, ""},
%% {ssl, true},
%% {ssl_opts, [{cacertfile, "/path/to/cacert.pem"},
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
%% One of 'basic', 'detailed' or 'none'. See
%% http://www.rabbitmq.com/management.html#fine-stats for more details.
%% {rates_mode, basic},
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
%% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%% {sample_retention_policies,
%% [{global, [{60, 5}, {3600, 60}, {86400, 1200}]},
%% {basic, [{60, 5}, {3600, 60}]},
%% {detailed, [{10, 5}]}]}
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%% See http://www.rabbitmq.com/shovel.html for details
%% ----------------------------------------------------------------------------
[%% A named shovel worker.
%% {my_first_shovel,
%% [
%% List the source broker(s) from which to consume.
%% {sources,
%% [%% URI(s) and pre-declarations for all source broker(s).
%% {brokers, ["amqp://user:password@host.domain/my_vhost"]},
%% {declarations, []}
%% ]},
%% List the destination broker(s) to publish to.
%% {destinations,
%% [%% A singular version of the 'brokers' element.
%% {broker, "amqp://"},
%% {declarations, []}
%% ]},
%% Name of the queue to shovel messages from.
%% {queue, <<"your-queue-name-goes-here">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%% {publish_fields, [{exchange, <<"my_exchange">>},
%% {routing_key, <<"from_shovel">>}]},
%% Static list of basic.properties to set on re-publication.
%% {publish_properties, [{delivery_mode, 2}]},
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
%% ]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
%% ----------------------------------------------------------------------------
%% RabbitMQ Stomp Adapter
%% See http://www.rabbitmq.com/stomp.html for details
%% ----------------------------------------------------------------------------
[%% Network Configuration - the format is generally the same as for the broker
%% Listen only on localhost (ipv4 & ipv6) on a specific port.
%% {tcp_listeners, [{"", 61613},
%% {"::1", 61613}]},
%% Listen for SSL connections on a specific port.
%% {ssl_listeners, [61614]},
%% Number of Erlang processes that will accept connections for the TCP
%% and SSL listeners.
%% {num_tcp_acceptors, 10},
%% {num_ssl_acceptors, 1},
%% Additional SSL options
%% Extract a name from the client's certificate when using SSL.
%% {ssl_cert_login, true},
%% Set a default user name and password. This is used as the default login
%% whenever a CONNECT frame omits the login and passcode headers.
%% Please note that setting this will allow clients to connect without
%% authenticating!
%% {default_user, [{login, "guest"},
%% {passcode, "guest"}]},
%% If a default user is configured, or you have configured use SSL client
%% certificate based authentication, you can choose to allow clients to
%% omit the CONNECT frame entirely. If set to true, the client is
%% automatically connected as the default user or user supplied in the
%% SSL certificate whenever the first frame sent on a session is not a
%% CONNECT frame.
%% {implicit_connect, true}
%% ----------------------------------------------------------------------------
%% RabbitMQ MQTT Adapter
%% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
%% for details
%% ----------------------------------------------------------------------------
[%% Set the default user name and password. Will be used as the default login
%% if a connecting client provides no other login details.
%% Please note that setting this will allow clients to connect without
%% authenticating!
%% {default_user, <<"guest">>},
%% {default_pass, <<"guest">>},
%% Enable anonymous access. If this is set to false, clients MUST provide
%% login information in order to connect. See the default_user/default_pass
%% configuration elements for managing logins without authentication.
%% {allow_anonymous, true},
%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%% {vhost, <<"/">>},
%% Specify the exchange to which messages from MQTT clients are published.
%% {exchange, <<"amq.topic">>},
%% Specify TTL (time to live) to control the lifetime of non-clean sessions.
%% {subscription_ttl, 1800000},
%% Set the prefetch count (governing the maximum number of unacknowledged
%% messages that will be delivered).
%% {prefetch, 10},
%% TCP/SSL Configuration (as per the broker configuration).
%% {tcp_listeners, [1883]},
%% {ssl_listeners, []},
%% Number of Erlang processes that will accept connections for the TCP
%% and SSL listeners.
%% {num_tcp_acceptors, 10},
%% {num_ssl_acceptors, 1},
%% TCP/Socket options (as per the broker configuration).
%% {tcp_listen_options, [{backlog, 128},
%% {nodelay, true}]}
%% ----------------------------------------------------------------------------
%% RabbitMQ AMQP 1.0 Support
%% See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md
%% for details
%% ----------------------------------------------------------------------------
[%% Connections that are not authenticated with SASL will connect as this
%% account. See the README for more information.
%% Please note that setting this will allow clients to connect without
%% authenticating!
%% {default_user, "guest"},
%% Enable protocol strict mode. See the README for more information.
%% {protocol_strict_mode, false}
%% ----------------------------------------------------------------------------
%% RabbitMQ LDAP Plugin
%% See http://www.rabbitmq.com/ldap.html for details.
%% ----------------------------------------------------------------------------
%% Connecting to the LDAP server(s)
%% ================================
%% Specify servers to bind to. You *must* set this in order for the plugin
%% to work properly.
%% {servers, ["your-server-name-goes-here"]},
%% Connect to the LDAP server using SSL
%% {use_ssl, false},
%% Specify the LDAP port to connect to
%% {port, 389},
%% LDAP connection timeout, in milliseconds or 'infinity'
%% {timeout, infinity},
%% Enable logging of LDAP queries.
%% One of
%% - false (no logging is performed)
%% - true (verbose logging of the logic used by the plugin)
%% - network (as true, but additionally logs LDAP network traffic)
%% Defaults to false.
%% {log, false},
%% Authentication
%% ==============
%% Pattern to convert the username given through AMQP to a DN before
%% binding
%% {user_dn_pattern, "cn=${username},ou=People,dc=example,dc=com"},
%% Alternatively, you can convert a username to a Distinguished
%% Name via an LDAP lookup after binding. See the documentation for
%% full details.
%% When converting a username to a dn via a lookup, set these to
%% the name of the attribute that represents the user name, and the
%% base DN for the lookup query.
%% {dn_lookup_attribute, "userPrincipalName"},
%% {dn_lookup_base, "DC=gopivotal,DC=com"},
%% Controls how to bind for authorisation queries and also to
%% retrieve the details of users logging in without presenting a
%% password (e.g., SASL EXTERNAL).
%% One of
%% - as_user (to bind as the authenticated user - requires a password)
%% - anon (to bind anonymously)
%% - {UserDN, Password} (to bind with a specified user name and password)
%% Defaults to 'as_user'.
%% {other_bind, as_user},
%% Authorisation
%% =============
%% The LDAP plugin can perform a variety of queries against your
%% LDAP server to determine questions of authorisation. See
%% http://www.rabbitmq.com/ldap.html#authorisation for more
%% information.
%% Set the query to use when determining vhost access
%% {vhost_access_query, {in_group,
%% "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},
%% Set the query to use when determining resource (e.g., queue) access
%% {resource_access_query, {constant, true}},
%% Set queries to determine which tags a user has
%% {tag_queries, []}
下载Golgang运行amqp协议的包,在Rabbitmq官网上有提供现在的golang包来使用amqp协议与Rabbitmq交互 。
lion@node1:~$ go get github.com/streadway/amqp
5.1、使用Golang来发送第一个hello idoall.org
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
//Body of message
bodyMsg string = "hello idoall.org"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, queueName)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_hello.go
2016/07/23 02:29:51 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 02:29:51 got Connection, getting Channel
2016/07/23 02:29:51 got queue, declaring "test-idoall-queues"
2016/07/23 02:29:51 declared queue, publishing 16B body ("hello idoall.org")
2016/07/23 02:29:51 published 16B OK
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues 1
Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_hello.go
2016/07/23 03:33:14 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 03:33:14 got Connection, getting Channel
2016/07/23 03:33:14 got queue, declaring "test-idoall-queues"
2016/07/23 03:33:14 Queue bound to Exchange, starting Consume
2016/07/23 03:33:14 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 03:33:14 Received a message: hello idoall.org
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, queueName)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40 [*] Waiting for messages. To exit press CTRL+C
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40 [*] Waiting for messages. To exit press CTRL+C
这个时候我们使用Producer 来 Publish Message:
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_task.go First message. && go run producer_task.go Second message.. && go run producer_task.go Third message... && go run producer_task.go Fourth message.... && go run producer_task.go Fifth message.....
2016/07/23 10:17:13 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:13 got Connection, getting Channel
2016/07/23 10:17:13 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:13 declared queue, publishing 14B body ("First message.")
2016/07/23 10:17:13 published 14B OK
2016/07/23 10:17:14 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:14 got Connection, getting Channel
2016/07/23 10:17:14 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:14 declared queue, publishing 16B body ("Second message..")
2016/07/23 10:17:14 published 16B OK
2016/07/23 10:17:15 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:15 got Connection, getting Channel
2016/07/23 10:17:15 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:15 declared queue, publishing 16B body ("Third message...")
2016/07/23 10:17:15 published 16B OK
2016/07/23 10:17:16 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:16 got Connection, getting Channel
2016/07/23 10:17:16 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:16 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 10:17:16 published 18B OK
2016/07/23 10:17:16 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:16 got Connection, getting Channel
2016/07/23 10:17:16 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:16 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 10:17:16 published 18B OK
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:21 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:21 got Connection, getting Channel
2016/07/23 10:11:21 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:21 Queue bound to Exchange, starting Consume
2016/07/23 10:11:21 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 10:17:13 Received a message: First message.
2016/07/23 10:17:14 Done
2016/07/23 10:17:15 Received a message: Third message...
2016/07/23 10:17:18 Done
2016/07/23 10:17:18 Received a message: Fifth message.....
2016/07/23 10:17:23 Done
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 10:17:14 Received a message: Second message..
2016/07/23 10:17:16 Done
2016/07/23 10:17:16 Received a message: Fourth message....
2016/07/23 10:17:20 Done
默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询
5.3、Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, queueName)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message.....
2016/07/23 21:41:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:40 got Connection, getting Channel
2016/07/23 21:41:40 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:40 declared queue, publishing 14B body ("First message.")
2016/07/23 21:41:40 published 14B OK
2016/07/23 21:41:41 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:41 got Connection, getting Channel
2016/07/23 21:41:41 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:41 declared queue, publishing 16B body ("Second message..")
2016/07/23 21:41:41 published 16B OK
2016/07/23 21:41:41 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:41 got Connection, getting Channel
2016/07/23 21:41:41 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:41 declared queue, publishing 16B body ("Third message...")
2016/07/23 21:41:41 published 16B OK
2016/07/23 21:41:42 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:42 got Connection, getting Channel
2016/07/23 21:41:42 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:42 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 21:41:42 published 18B OK
2016/07/23 21:41:43 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:43 got Connection, getting Channel
2016/07/23 21:41:43 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:43 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 21:41:43 published 18B OK
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
test-idoall-queues-task 0 0
test-idoall-queues 0 0
test-idoall-queues-acknowledgments 5 0
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_acknowledgments.go
2016/07/23 21:56:35 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:56:35 got Connection, getting Channel
2016/07/23 21:56:35 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:56:35 Queue bound to Exchange, starting Consume
2016/07/23 21:56:35 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 21:56:35 Received a message: First message.
2016/07/23 21:56:36 Done
2016/07/23 21:56:36 Received a message: Second message..
2016/07/23 21:56:38 Done
2016/07/23 21:56:38 Received a message: Third message...
^Csignal: interrupt
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
test-idoall-queues-task 0 0
test-idoall-queues 0 0
test-idoall-queues-acknowledgments 3 0
5.4、Message durability消息持久化
如果服务器死机或程序 crash了,数据仍然会丢失。为了确保消息不会丢失,我们需要将queue和Message做持久化操作。
将durable设置为true可以做持久化处理(生产者和消息者的代码里都要设置),如果是已经存在的一个queue 没有设置过持久化,再重新设置是不起作用的,我们需要重新为queue设置一个名字。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-durability"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-durability"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, queueName)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_durability.go First message. && go run producer_durability.go Second message.. && go run producer_durability.go Third message... && go run producer_durability.go Fourth message.... && go run producer_durability.go Fifth message.....
2016/07/23 22:35:03 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:03 got Connection, getting Channel
2016/07/23 22:35:03 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:04 declared queue, publishing 14B body ("First message.")
2016/07/23 22:35:04 published 14B OK
2016/07/23 22:35:04 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:04 got Connection, getting Channel
2016/07/23 22:35:04 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:04 declared queue, publishing 16B body ("Second message..")
2016/07/23 22:35:04 published 16B OK
2016/07/23 22:35:05 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:05 got Connection, getting Channel
2016/07/23 22:35:05 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:05 declared queue, publishing 16B body ("Third message...")
2016/07/23 22:35:05 published 16B OK
2016/07/23 22:35:06 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:06 got Connection, getting Channel
2016/07/23 22:35:06 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:06 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 22:35:06 published 18B OK
2016/07/23 22:35:06 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:06 got Connection, getting Channel
2016/07/23 22:35:06 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:06 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 22:35:06 published 18B OK
通过rabbitmqctl list_queues命令,来看下messages_unacknowledged的情况:
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues-task 0
test-idoall-queues 0
test-idoall-queues-durability 5
test-idoall-queues-acknowledgments 0
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl stop
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmq-server
RabbitMQ 3.6.3. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /home/lion/_app/rabbitmq_server-3.6.3/var/log/rabbitmq/rabbit@node1.log
###### ## /home/lion/_app/rabbitmq_server-3.6.3/var/log/rabbitmq/rabbit@node1-sasl.log
Starting broker...
completed with 6 plugins.
再次通过rabbitmqctl list_queues命令查看,可以看到消息是存在的,说明我们的持久化是成功的
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues-durability 5
5.5、Fair dispatch 公平分发
上面的,分发机制不是那么优雅。默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。
通过 ch.Qos 方法设置预读取消息prefetch count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-fair_dispatch"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-fair_dispatch"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, queueName)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
failOnError(err, "Failed to set QoS")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_fair_dispatch.go First message. && go run producer_fair_dispatch.go Second message.. && go run producer_fair_dispatch.go Third message... && go run producer_fair_dispatch.go Fourth message.... && go run producer_fair_dispatch.go Fifth message.....
2016/07/23 23:09:24 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:24 got Connection, getting Channel
2016/07/23 23:09:24 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:24 declared queue, publishing 14B body ("First message.")
2016/07/23 23:09:24 published 14B OK
2016/07/23 23:09:24 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:24 got Connection, getting Channel
2016/07/23 23:09:24 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:24 declared queue, publishing 16B body ("Second message..")
2016/07/23 23:09:24 published 16B OK
2016/07/23 23:09:25 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:25 got Connection, getting Channel
2016/07/23 23:09:25 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:25 declared queue, publishing 16B body ("Third message...")
2016/07/23 23:09:25 published 16B OK
2016/07/23 23:09:26 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:26 got Connection, getting Channel
2016/07/23 23:09:26 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:26 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 23:09:26 published 18B OK
2016/07/23 23:09:27 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:27 got Connection, getting Channel
2016/07/23 23:09:27 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:27 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 23:09:27 published 18B OK
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_fair_dispatch.go
2016/07/23 23:10:47 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:10:47 got Connection, getting Channel
2016/07/23 23:10:47 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:10:47 Queue bound to Exchange, starting Consume
2016/07/23 23:10:47 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 23:10:47 Received a message: First message.
2016/07/23 23:10:48 Done
2016/07/23 23:10:48 Received a message: Second message..
2016/07/23 23:10:50 Done
2016/07/23 23:10:50 Received a message: Fourth message....
2016/07/23 23:10:54 Done
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_fair_dispatch.go
2016/07/23 23:10:49 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:10:49 got Connection, getting Channel
2016/07/23 23:10:49 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:10:49 Queue bound to Exchange, starting Consume
2016/07/23 23:10:49 [*] Waiting for messages. To exit press CTRL+C
2016/07/23 23:10:49 Received a message: Third message...
2016/07/23 23:10:52 Done
2016/07/23 23:10:52 Received a message: Fifth message.....
2016/07/23 23:10:57 Done
基于AMQP的更多通道和消息属性,可以浏览AMQP API参考
5.6、Exchanges & Bindings
RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。
Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到一个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。
我们知道有三种类型的Exchange:direct,,topic,headers 和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct direct
amq.fanout fanout
amq.match headers
amq.headers headers
amq.rabbitmq.trace topic
amq.topic topic
amq.rabbitmq.log topic
注意:amq.* 是RabbitMQ默认创建的。
我们假设做一个日志系统,其中一个运行的接收程序Consumer发到消息后写入到磁盘中,同时, 另一个Consumer将收到的日志输出到屏幕上。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "fanout"
//AMQP routing key
routingKey = ""
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@routingKey, routingKey的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
err = channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Failed to declare a queue")
// 发布消息
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
err = channel.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "fanout"
//AMQP binding key
bindingKey = ""
//Durable AMQP queue name
queueName = ""
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@queue, queue的名称
//@key , 绑定的key名称
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring Exchange (%q)", exchange)
err = channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Exchange Declare:")
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
true, // exclusive 当Consumer关闭连接时,这个queue要被deleted
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = channel.QueueBind(
q.Name, // name of the queue
key, // bindingKey
exchange, // sourceExchange
false, // noWait
nil, // arguments
failOnError(err, "Failed to bind a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
在AMQP客户端 ,当routing key为空的时候, 自动创建一个随机的queue,同时设置exclusive为true时,当这个Consumer关闭链接 时,会删除这个queue。
过程中可以使用rabbitmqctl list_bindings命令来查看绑定的列表
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_logs.go &> consumer_exchange_logs.log
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_logs.go
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_logs.go
2016/07/24 02:21:49 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 02:21:49 got Connection, getting Channel
2016/07/24 02:21:49 got Channel, declaring "fanout" Exchange ("test-idoall-exchange-logs")
2016/07/24 02:21:49 declared queue, publishing 16B body ("hello idoall.org")
2016/07/24 02:21:49 published 16B OK
这时可以使用rabbitmqctl list_bindings来查看我们的绑定信息,可以看到queueu的名字是随机的
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_bindings
Listing bindings ...
exchange amq.gen-D2AnzGsLUMhJCPk7YxgUUw queue amq.gen-D2AnzGsLUMhJCPk7YxgUUw []
exchange amq.gen-GC4VDS3mxsAOTEqii_WsWw queue amq.gen-GC4VDS3mxsAOTEqii_WsWw []
test-idoall-exchange-logs exchange amq.gen-D2AnzGsLUMhJCPk7YxgUUw queue []
test-idoall-exchange-logs exchange amq.gen-GC4VDS3mxsAOTEqii_WsWw queue []
lion@node1:~/_code/_rabbitmq/_golang$ cat consumer_exchange_logs.log
2016/07/24 02:25:17 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 02:25:17 got Connection, getting Channel
2016/07/24 02:25:17 got Channel, declaring Exchange ("test-idoall-exchange-logs")
2016/07/24 02:25:17 Queue bound to Exchange, starting Consume
2016/07/24 02:25:17 [*] Waiting for messages. To exit press CTRL+C
signal: interrupt
5.7、Direct exchange
RabbitMQ支持同一个binding key绑定到多个queue中。Direct exchange的算法就是通过binding key来做匹配的。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-direct-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "direct"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, exchangeType, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[2:], " ")
return s
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "info"
} else {
s = os.Args[1]
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@body, 主体内容
func publish(amqpURI string, exchange string, exchangeType string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
err = channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Failed to declare a queue")
// 发布消息
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
err = channel.Publish(
exchange, // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-direct-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "direct"
//AMQP binding key
bindingKey = ""
//Durable AMQP queue name
queueName = ""
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@queue, queue的名称
//@key , 绑定的key名称
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring Exchange (%q)", exchange)
err = channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Exchange Declare:")
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
true, // exclusive 当Consumer关闭连接时,这个queue要被deleted
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, exchange, s)
err = channel.QueueBind(
q.Name, // name of the queue
s, // bindingKey
exchange, // sourceExchange
false, // noWait
nil, // arguments
failOnError(err, "Failed to bind a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_direct_logs.go warning error &> consumer_exchange_direct_logs.log
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_direct_logs.go info warning error
2016/07/24 08:48:17 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 08:48:17 got Connection, getting Channel
2016/07/24 08:48:17 got Channel, declaring Exchange ("test-idoall-exchange-direct-logs")
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key info
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key warning
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key error
2016/07/24 08:48:17 Queue bound to Exchange, starting Consume
2016/07/24 08:48:17 [*] Waiting for messages. To exit press CTRL+C
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_direct_logs.go error "Error. Error" && go run producer_exchange_direct_logs.go info "Info. Info" && go run producer_exchange_direct_logs.go warning "warning. warning"
5.7、Topic exchange
对于Topic的exchange中Message的routing_key是有限制的,不能太随意。格式是以点号“.”分割的字符表。比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,不过长度不能超过255 bytes。
- * (星号) 代表任意 一个单词
- # (hash哈希) 0个或者多个单词
Topic exchange和其他exchange的区别,由于有”*”和”#”, Topic exchange 非常强大并且可以转化为其他的exchange:
- 如果binding_key 是 “#” – 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
- 如果 “*”和”#”没有被使用,那么topic exchange就变成了direct exchange。
下面的代码中,我们将演示Topic的exchange使用”#”和”*”来匹配binding key。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-direct-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "fanout"
//AMQP routing key
routingKey = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-direct"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
bodyMsg := bodyFrom(os.Args)
publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
return s
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@routingKey, routingKey的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
err = channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Failed to declare a queue")
// 发布消息
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
err = channel.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = "test-idoall-exchange-topic-logs"
//Exchange type - direct|fanout|topic|x-custom
exchangeType = "topic"
//AMQP binding key
bindingKey = ""
//Durable AMQP queue name
queueName = ""
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@exchangeType, exchangeType的类型direct|fanout|topic
//@queue, queue的名称
//@key , 绑定的key名称
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got Channel, declaring Exchange (%q)", exchange)
err = channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
failOnError(err, "Exchange Declare:")
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
true, // exclusive 当Consumer关闭连接时,这个queue要被deleted
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = channel.QueueBind(
q.Name, // name of the queue
key, // bindingKey
exchange, // sourceExchange
false, // noWait
nil, // arguments
failOnError(err, "Failed to bind a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "#"
2016/07/24 09:28:29 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:28:29 got Connection, getting Channel
2016/07/24 09:28:29 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:28:29 Binding queue amq.gen-jW2-PIBg4izXpt96CynyFw to exchange test-idoall-exchange-topic-logs with routing key #
2016/07/24 09:28:29 Queue bound to Exchange, starting Consume
2016/07/24 09:28:29 [*] Waiting for messages. To exit press CTRL+C
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "kern.*"
2016/07/24 09:34:00 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:34:00 got Connection, getting Channel
2016/07/24 09:34:00 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:34:00 Binding queue amq.gen-8zYBz2uXYbWXcItJMZ3AQA to exchange test-idoall-exchange-topic-logs with routing key kern.*
2016/07/24 09:34:00 Queue bound to Exchange, starting Consume
2016/07/24 09:34:00 [*] Waiting for messages. To exit press CTRL+C
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "*.critical"
2016/07/24 09:37:21 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:37:21 got Connection, getting Channel
2016/07/24 09:37:21 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:37:21 Binding queue amq.gen-tq9QsD1i1mCps-jrqDtTTA to exchange test-idoall-exchange-topic-logs with routing key *.critical
2016/07/24 09:37:21 Queue bound to Exchange, starting Consume
2016/07/24 09:37:21 [*] Waiting for messages. To exit press CTRL+C
Console4(Consumer), 可以创建多个绑定关系:
lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "kern.critical" "A critical kernel error"
2016/07/24 09:39:35 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:39:35 got Connection, getting Channel
2016/07/24 09:39:35 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:39:35 Binding queue amq.gen-vcaHyCor5bbB2NX7YQhmzA to exchange test-idoall-exchange-topic-logs with routing key kern.critical
2016/07/24 09:39:35 Binding queue amq.gen-vcaHyCor5bbB2NX7YQhmzA to exchange test-idoall-exchange-topic-logs with routing key A critical kernel error
2016/07/24 09:39:35 Queue bound to Exchange, starting Consume
2016/07/24 09:39:35 [*] Waiting for messages. To exit press CTRL+C
lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_topic_logs.go "kern.critical" "A critical kernel error"
2016/07/24 09:56:33 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:56:33 got Connection, getting Channel
2016/07/24 09:56:33 got Channel, declaring "topic" Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:56:33 declared queue, publishing 23B body ("A critical kernel error")
2016/07/24 09:56:33 [x] Sent A critical kernel error
2016/07/24 09:56:33 published 23B OK
- persistent:消息持久性
- content_type:用来描述编码的MIME类型
- reply_to:回调queue的名字
- correlation_id:将远程RPC请求,进行关联的唯一标识
- 客户端启动时,创建一个匿名的exclusive callback queue
- 客户端发送请求时,要带两个属性reply_to(设置回调的queue)和correlation_id(唯一标识)
- 将请求发送到一个RPC queue
- RPC的server端 ,一直在等待请求,当消息到达时会对过reply_to回复到指定的queue
- 客户端在等queue从server的回调,检查 correlation_id是否一致,如果和请求时发送的一致,则做其他响应。
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP queue name
queueName = "rpc-queue"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func main(){
publish(uri, queueName)
//@amqpURI, amqp的地址
//@queue, queue的名称
func publish(amqpURI string, queue string){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q,err := channel.QueueDeclare(
queue, // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
failOnError(err, "Failed to set QoS")
//log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
// 发布消息
go func() {
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
log.Printf(" [.] server端接收到的数据是 (%d)", n)
response := n*2
err = channel.Publish(
"", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
failOnError(err, "Failed to publish a message")
log.Printf(" [*] Awaiting RPC requests")
package main
import (
const (
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Exchange type - direct|fanout|topic|x-custom
queueName = "rpc-queue"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(randInt(65, 90))
return string(bytes)
func randInt(min int, max int) int {
return min + rand.Intn(max-min)
func bodyFrom(args []string) int {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "30"
} else {
s = strings.Join(args[1:], " ")
n, err := strconv.Atoi(s)
failOnError(err, "Failed to convert arg to integer")
return n
func main(){
n := bodyFrom(os.Args)
log.Printf(" [x] 请求的数据是(%d)", n)
res, err := fibonacciRPC(n, uri, exchangeName, queueName)
failOnError(err, "Failed to handle RPC request")
log.Printf(" [.] 计算结果为 %d", res)
//RPC client调用方法
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func fibonacciRPC(n int, amqpURI string, exchange string, queue string) (res int, err error){
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
q,err := channel.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
corrId := randomString(32)
err = channel.Publish(
"", // exchange
queue, // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
failOnError(err, "Failed to publish a message")
for d := range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
Console1(rpc server):
lion@node1:~/_code/_rabbitmq/_golang$ go run rpc_server.go
2016/07/24 11:20:32 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 11:20:32 got Connection, getting Channel
2016/07/24 11:20:32 got queue, declaring "rpc-queue"
2016/07/24 11:20:32 [*] Awaiting RPC requests
Console2(rpc client):
lion@node1:~/_code/_rabbitmq/_golang$ go run rpc_client.go 69
2016/07/24 11:24:37 [x] 请求的数据是(69)
2016/07/24 11:24:37 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 11:24:37 got Connection, getting Channel
2016/07/24 11:24:37 got queue, declaring "rpc-queue"
2016/07/24 11:24:37 Queue bound to Exchange, starting Consume
2016/07/24 11:24:37 [.] 计算结果为 138
安装Erlang过程中出现提示configure: error: No curses library functions found
lion@node1:~/$ sudo apt-get install libncurses5-dev
写作时间:2016-07-28 15:22
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
5 thoughts on “Ubuntu14.04+RabbitMQ3.6.3+Golang的最佳实践”
topic 代码有问题的啊
Wow! At last I got a web site from where I can genuinely take helpful
data regarding my study and knowledge.
Great goods from you, man. I have understand your
stuff previous to and you are just too great. I actually like what you’ve acquired here, certainly
like what you are stating and the way in which you say it.
You make it entertaining and you still take care of to keep it sensible.
I can not wait to read far more from you. This is actually a wonderful web site.