Fedmsg is amazing. Need to explore more!

So, how do you get about to making sense of this project? Let’s see.

setup.py

This file defines several entry points that you get when you install fedmsg.

    entry_points={
        'console_scripts': [
            "fedmsg-logger=fedmsg.commands.logger:logger [commands]",
            "fedmsg-tail=fedmsg.commands.tail:tail [commands]",
            "fedmsg-collectd=fedmsg.commands.collectd:collectd [commands]",
            "fedmsg-announce=fedmsg.commands.announce:announce [commands]",
            "fedmsg-trigger=fedmsg.commands.trigger:trigger [commands]",
            "fedmsg-dg-replay=fedmsg.commands.replay:replay [commands]",
            #"fedmsg-config=fedmsg.commands.config:config [commands]",
            "fedmsg-hub=fedmsg.commands.hub:hub [consumers]",
            "fedmsg-relay=fedmsg.commands.relay:relay [consumers]",
            "fedmsg-gateway=fedmsg.commands.gateway:gateway [consumers]",
            "fedmsg-irc=fedmsg.commands.ircbot:ircbot [consumers]",
        ],
        'moksha.consumer': [
            "fedmsg-dummy=fedmsg.consumers.dummy:DummyConsumer [consumers]",
            "fedmsg-relay=fedmsg.consumers.relay:RelayConsumer [consumers]",
            "fedmsg-gateway=fedmsg.consumers.gateway:GatewayConsumer [consumers]",
            "fedmsg-ircbot=fedmsg.consumers.ircbot:IRCBotConsumer [consumers]",
        ],
        'moksha.producer': [
        ],
        # fedmsg core only provides one metadata provider.
        'fedmsg.meta': [
            "logger=fedmsg.meta.logger:LoggerProcessor",
            "announce=fedmsg.meta.announce:AnnounceProcessor",
        ],

Each fedmsg-* is implemented with a file in fedmsg.commands that does something.

fedmsg.d

All the various files have a dict called config which has some key-value pairs that are used in the code as config parameters. For eg, the relay.py has a dict with relay_outbount and relay_inbound keys whose values are the addresses that are to serve as the outlet and inlet ports.

config = dict(
    endpoints={
        "relay_outbound": [
            "tcp://127.0.0.1:4001",
        ],
    },
    relay_inbound=[
        "tcp://127.0.0.1:2003",
    ],
)

The order in which the config parameter is looked for is

- Built-in defaults
- Config file (/etc/fedmsg.d/*.py)
- Config file (./fedmsg.d/*.py)
- Command line arguments

The config.py in fedmsg looks at the ./fedmsg.d (and others) and updates the config dict with the load_config method.

__init__.py

This file imports fedmsg.core and fedmsg.config to do the heavy lifting.

  • fedmsg.config
    • Just verifies that the config is fine and returns the config dict when called by __init.py
    • Used in the __init__.py file: config = fedmsg.config.load_config([], None).
  • fedmsg.core
    • connects to the zmq machinery
        self.publisher = self.context.socket(zmq.PUB)
    
        set_high_water_mark(self.publisher, config)
        set_tcp_keepalive(self.publisher, config)
    
        if method == 'connect':
            self.publisher.setsockopt(zmq.LINGER, config['zmq_linger'])
    
    • after the FedMsgContext.__init__, we have the __local.__context attribute which is just an instance of the FedMsgContext object. This is returned when the fedmsg.init is called. This is called by the command files :top:

    • also, there are some more methods defined in the __init__.py file like publish, destroy, which just call the corresponding methods in the FedMsgContext class.

    Example:

    # this is from __init__.py
    @API_function(doc=fedmsg.core.FedMsgContext.publish.__doc__)
    def publish(topic=None, msg=None, **kw):
        return __local.__context.publish(topic, msg, **kw)
    

    These commands are used by the commands. Example:

    # this is from logger.py
    fedmsg.publish(
    topic=kw['topic'],
    msg=msg,
    modname=kw['modname'])
    

commands dir

  • The commands dir has various files that define each a command. Each of them use fedmsg in a certain way.
  • All the commands extend the BaseCommand class from fedmsg.commands.__init__.py
    • The __init__.py of BaseCommand gets the config from fedmsg.config.load_config and assigns it to self.config
    • Also, if the config has daemon to True, the init.py “daemonizes” the command

tail.py

Let’s talk about one command. The fedmsg-tail command which prints out the messages from the endpoints mentioned in config to stdout.

When the user does:

`fedmsg-tail --really-pretty`

this happens:

  • setup.py tells us that this is bound to fedmsg-tail=fedmsg.commands.tail:tail [commands]. So, fedmsg.commands.tail’s tail method gets called. It does this:
  def tail():
    command = TailCommand()
    return command.execute()
  • It creates an instance of the TailCommand object and executes execute. The execute is defined in parent class BaseCommand and it calls run method.

  • The run command does this:

      def run(self):
        # Disable sending
        self.config['publish_endpoint'] = None

        # Disable timeouts.  We want to tail forever!
        self.config['timeout'] = 0

        # Even though fedmsg-tail won't be sending any messages, give it a
        # name to conform with the other commands.
        self.config['name'] = 'relay_inbound'

        # Tail is never going to send any messages, so we suppress warnings
        # about having no publishing sockets established.
        self.config['mute'] = True

        fedmsg.init(**self.config)

Recall we have self.config which gets us the config after reading our fedmsg.d dir (among other places). We just change some config parameters and then create the FedMsgContext instances.

The run function also defines the format in which we will output the message after taking into consideration the flags (like –really-pretty)

Finally, we get the messages for the logger like so:

    for name, ep, topic, message in fedmsg.tail_messages(**self.config):
        # some if-conditionals omitted for brevity
        output = formatter(message)
        if output:
            self.log.info(output)

Recall fedmsg.init has the tail_messages function which in fact just calls the corresponding function on the FedMesContext object. fedmsg.init’s tail_messages:

@API_function(doc=fedmsg.core.FedMsgContext.tail_messages.__doc__)
def tail_messages(**kw):
    for item in __local.__context.tail_messages(**kw):
        yield item

Note how the self.config of the tail command is sent to initialize the FedMsgContext object (after taking in the command line flags)

Recall __local.__context is the instance of tail_messages. core.FedMsgConext.tail_messages’s tail_messages:

    def tail_messages(self, topic="", passive=False, **kw):
        """ Tail messages on the bus.

        Generator that yields tuples of the form:
        ``(name, endpoint, topic, message)``
        """
        poller, subs = self._create_poller(topic=topic, passive=False, **kw)
        try:
            for msg in self._poll(poller, subs):
                yield msg
        finally:
            self._close_subs(subs)

The _create_poller iterates thru each endpoint in config[‘endpoints’] and creates socket connections to them. We then give these sockets to znc.Poller to poll and give us the messages.

It is all just fancy socket play guided by zmq’s machinery.

logger.py

The tail was for subscribing. Let’s look at fedmsg for publishing.

As usual, the fedmsg-logger runs the fedmsg.commands.logger.run function. It executes execute which executes run (yada-yada-yada).

The run function is super simple

    def run(self):
        self.config['active'] = True
        self.config['name'] = 'relay_inbound'
        fedmsg.init(**self.config)

        if self.config.get('logger_message'):
            self._log_message(self.config, self.config.get('logger_message'))
        elif self.config['json_input']:
            self._log_message(self.config, sys.stdin.read())
        else:
            line = sys.stdin.readline()
            while line:
                self._log_message(self.config, line.strip())
                line = sys.stdin.readline()

    # the publish method simply calls fedmsg.__init__'s publish
    def _log_message(self, kw, message):
        if kw['json_input']:
            msg = fedmsg.encoding.loads(message)
        else:
            msg = {'log': message}

        fedmsg.publish(
            topic=kw['topic'],
            msg=msg,
            modname=kw['modname'],
        )

We know what the fedmsg.init’s publish does. Calls FedMsgConext class’ instance’s publish. We know what that does, uses the zmq API send_multipart to send the message.

consumer dir

In fedmsg, there is also the consumer dir. This is used if zmq is

larger picture

So, the entire fedora infrastructure ecosystem is pretty cool. Here are the big pieces and how they fit in together

  • fedmsg
    • this is the core of it all. It provides applications with a API to push to the fedmsg bus or listen to events on the fedmsg bus.
    • It reads from a dir of config files (among other places). At the base of it all is the core.py file that implements the FedMsgContext class. That class is a sort of a wrapper on zmq in some places and in others uses the zmq API to take care of publishing and receiving messages.
    • the project includes some command scripts that just are ready-to-run recipes for running fedmsg as a logger, relay, ircbot etc
    • internally, to subscribe we just read the endpoints off of config dir and create socket connections to them which are then polled by zmq
    • to publish, it is even simpler. Just use the zmq API to sent the messages over our publish socket.
    • one interesting thing is this. Why do we have a seperate init file with all the methods of the FedMsgContext class? This is so that we can provide better API to the end user, which is actually one of the agreed on target of fedmsg
  • datanommer
    • this is just a fedmsg consumer that subscribes to all the fedmsg topics and puts the data in postgres database
    • it extends the fedmsg.consumers.FedmsgConsumer with topics as *.
    • it also has two more components.
      • models – they define a model for the messages received which are then dumped to a database using SqlAlchemy
      • commands – provides some commands to interact with the database and with the consumer
  • datagrepper
    • it is a very simple flask app that queries datanommer
    • it uses the datanommer.models.Messages.grep function which accepts all the filters and using the ORM, returns the results.
  • statscache
    • TODO