digging into fedmsg
Fedmsg is amazing. Need to explore more!
So, how do you get about to making sense of this project? Let’s see.
setup.py
This file defines several entry points that you get when you install fedmsg.
entry_points={
'console_scripts': [
"fedmsg-logger=fedmsg.commands.logger:logger [commands]",
"fedmsg-tail=fedmsg.commands.tail:tail [commands]",
"fedmsg-collectd=fedmsg.commands.collectd:collectd [commands]",
"fedmsg-announce=fedmsg.commands.announce:announce [commands]",
"fedmsg-trigger=fedmsg.commands.trigger:trigger [commands]",
"fedmsg-dg-replay=fedmsg.commands.replay:replay [commands]",
#"fedmsg-config=fedmsg.commands.config:config [commands]",
"fedmsg-hub=fedmsg.commands.hub:hub [consumers]",
"fedmsg-relay=fedmsg.commands.relay:relay [consumers]",
"fedmsg-gateway=fedmsg.commands.gateway:gateway [consumers]",
"fedmsg-irc=fedmsg.commands.ircbot:ircbot [consumers]",
],
'moksha.consumer': [
"fedmsg-dummy=fedmsg.consumers.dummy:DummyConsumer [consumers]",
"fedmsg-relay=fedmsg.consumers.relay:RelayConsumer [consumers]",
"fedmsg-gateway=fedmsg.consumers.gateway:GatewayConsumer [consumers]",
"fedmsg-ircbot=fedmsg.consumers.ircbot:IRCBotConsumer [consumers]",
],
'moksha.producer': [
],
# fedmsg core only provides one metadata provider.
'fedmsg.meta': [
"logger=fedmsg.meta.logger:LoggerProcessor",
"announce=fedmsg.meta.announce:AnnounceProcessor",
],
Each fedmsg-* is implemented with a file in fedmsg.commands that does something.
fedmsg.d
All the various files have a dict called config
which has some key-value pairs that are used in the code as config parameters.
For eg, the relay.py
has a dict with relay_outbount
and relay_inbound
keys whose values are the addresses that are to serve as the outlet and inlet ports.
config = dict(
endpoints={
"relay_outbound": [
"tcp://127.0.0.1:4001",
],
},
relay_inbound=[
"tcp://127.0.0.1:2003",
],
)
The order in which the config parameter is looked for is
- Built-in defaults
- Config file (/etc/fedmsg.d/*.py)
- Config file (./fedmsg.d/*.py)
- Command line arguments
The config.py
in fedmsg
looks at the ./fedmsg.d
(and others) and updates the config dict with the load_config
method.
__init__.py
This file imports fedmsg.core
and fedmsg.config
to do the heavy lifting.
fedmsg.config
- Just verifies that the config is fine and returns the config dict when called by
__init.py
- Used in the
__init__.py
file:config = fedmsg.config.load_config([], None)
.
- Just verifies that the config is fine and returns the config dict when called by
fedmsg.core
- connects to the
zmq
machinery
self.publisher = self.context.socket(zmq.PUB) set_high_water_mark(self.publisher, config) set_tcp_keepalive(self.publisher, config) if method == 'connect': self.publisher.setsockopt(zmq.LINGER, config['zmq_linger'])
-
after the
FedMsgContext.__init__
, we have the__local.__context
attribute which is just an instance of theFedMsgContext
object. This is returned when thefedmsg.init
is called. This is called by the command files :top: -
also, there are some more methods defined in the
__init__.py
file likepublish
,destroy
, which just call the corresponding methods in theFedMsgContext
class.
Example:
# this is from __init__.py @API_function(doc=fedmsg.core.FedMsgContext.publish.__doc__) def publish(topic=None, msg=None, **kw): return __local.__context.publish(topic, msg, **kw)
These commands are used by the
commands
. Example:# this is from logger.py fedmsg.publish( topic=kw['topic'], msg=msg, modname=kw['modname'])
- connects to the
commands dir
- The commands dir has various files that define each a command. Each of them use fedmsg in a certain way.
- All the commands extend the
BaseCommand
class fromfedmsg.commands.__init__.py
- The
__init__.py
ofBaseCommand
gets the config fromfedmsg.config.load_config
and assigns it toself.config
- Also, if the config has
daemon
toTrue
, the init.py “daemonizes” the command
- The
tail.py
Let’s talk about one command. The fedmsg-tail
command which prints out the messages from the endpoints mentioned in config to stdout.
When the user does:
`fedmsg-tail --really-pretty`
this happens:
- setup.py tells us that this is bound to
fedmsg-tail=fedmsg.commands.tail:tail [commands]
. So, fedmsg.commands.tail’s tail method gets called. It does this:
def tail():
command = TailCommand()
return command.execute()
-
It creates an instance of the TailCommand object and executes execute. The execute is defined in parent class BaseCommand and it calls run method.
-
The run command does this:
def run(self):
# Disable sending
self.config['publish_endpoint'] = None
# Disable timeouts. We want to tail forever!
self.config['timeout'] = 0
# Even though fedmsg-tail won't be sending any messages, give it a
# name to conform with the other commands.
self.config['name'] = 'relay_inbound'
# Tail is never going to send any messages, so we suppress warnings
# about having no publishing sockets established.
self.config['mute'] = True
fedmsg.init(**self.config)
Recall we have self.config which gets us the config after reading our fedmsg.d dir (among other places). We just change some config parameters and then create the FedMsgContext instances.
The run function also defines the format in which we will output the message after taking into consideration the flags (like –really-pretty)
Finally, we get the messages for the logger like so:
for name, ep, topic, message in fedmsg.tail_messages(**self.config):
# some if-conditionals omitted for brevity
output = formatter(message)
if output:
self.log.info(output)
Recall fedmsg.init has the tail_messages function which in fact just calls the corresponding function on the FedMesContext object. fedmsg.init’s tail_messages:
@API_function(doc=fedmsg.core.FedMsgContext.tail_messages.__doc__)
def tail_messages(**kw):
for item in __local.__context.tail_messages(**kw):
yield item
Note how the self.config of the tail command is sent to initialize the FedMsgContext object (after taking in the command line flags)
Recall __local.__context is the instance of tail_messages.
core.FedMsgConext.tail_messages
’s tail_messages:
def tail_messages(self, topic="", passive=False, **kw):
""" Tail messages on the bus.
Generator that yields tuples of the form:
``(name, endpoint, topic, message)``
"""
poller, subs = self._create_poller(topic=topic, passive=False, **kw)
try:
for msg in self._poll(poller, subs):
yield msg
finally:
self._close_subs(subs)
The _create_poller iterates thru each endpoint in config[‘endpoints’] and creates socket connections to them. We then give these sockets to znc.Poller to poll and give us the messages.
It is all just fancy socket play guided by zmq’s machinery.
logger.py
The tail was for subscribing. Let’s look at fedmsg for publishing.
As usual, the fedmsg-logger
runs the fedmsg.commands.logger.run
function. It executes execute which executes run (yada-yada-yada).
The run function is super simple
def run(self):
self.config['active'] = True
self.config['name'] = 'relay_inbound'
fedmsg.init(**self.config)
if self.config.get('logger_message'):
self._log_message(self.config, self.config.get('logger_message'))
elif self.config['json_input']:
self._log_message(self.config, sys.stdin.read())
else:
line = sys.stdin.readline()
while line:
self._log_message(self.config, line.strip())
line = sys.stdin.readline()
# the publish method simply calls fedmsg.__init__'s publish
def _log_message(self, kw, message):
if kw['json_input']:
msg = fedmsg.encoding.loads(message)
else:
msg = {'log': message}
fedmsg.publish(
topic=kw['topic'],
msg=msg,
modname=kw['modname'],
)
We know what the fedmsg.init’s publish does. Calls FedMsgConext class’ instance’s publish. We know what that does, uses the zmq API send_multipart
to send the message.
consumer dir
In fedmsg, there is also the consumer dir. This is used if zmq is
larger picture
So, the entire fedora infrastructure ecosystem is pretty cool. Here are the big pieces and how they fit in together
- fedmsg
- this is the core of it all. It provides applications with a API to push to the fedmsg bus or listen to events on the fedmsg bus.
- It reads from a dir of config files (among other places). At the base of it all is the
core.py
file that implements the FedMsgContext class. That class is a sort of a wrapper on zmq in some places and in others uses the zmq API to take care of publishing and receiving messages. - the project includes some
command
scripts that just are ready-to-run recipes for running fedmsg as a logger, relay, ircbot etc - internally, to subscribe we just read the endpoints off of config dir and create socket connections to them which are then polled by zmq
- to publish, it is even simpler. Just use the zmq API to sent the messages over our publish socket.
- one interesting thing is this. Why do we have a seperate init file with all the methods of the FedMsgContext class? This is so that we can provide better API to the end user, which is actually one of the agreed on target of fedmsg
- datanommer
- this is just a fedmsg consumer that subscribes to all the fedmsg topics and puts the data in postgres database
- it extends the fedmsg.consumers.FedmsgConsumer with topics as
*
. - it also has two more components.
- models – they define a model for the messages received which are then dumped to a database using SqlAlchemy
- commands – provides some commands to interact with the database and with the consumer
- datagrepper
- it is a very simple flask app that queries datanommer
- it uses the datanommer.models.Messages.grep function which accepts all the filters and using the ORM, returns the results.
- statscache
- TODO