In my last post, I mentioned about my GSoC project with Fedora; Centralized Metrics generation.

Let’s take up fedmsg first.

Fedmsg

It is a python package. It defines a brokerless architecture to send messages between different apps in the ecosystem. Inspired by Debian. Raw feed available via datagrepper - https://apps.fedoraproject.org/datagrepper/raw

It is really easy to print out the datafeed or even to push messages to the bus.

Several components of Fedora:

  • koji - the build system. It is the software that builds RPM packages for the fedora project. Packagers use the koji client to request package builds.
  • fedpkg The primary interface to the packaging system.
  • pkgdb This is the package database

How we can make the projects work with each other is simple. One project subscribes to a topic and provides a callback function that is called when the event occures.

Architecture

There are several ways to make this happen. One is the AMQP protocol. This protocol needs a central broker to pass messages. It receives messages from the producers and sends them over to the consumers. This introduces a single point of failure.

Then there is the serverless architecture, 0mq (aka zmq, ZeroMq etc)

Each producer streams over a port and the consumer listens on that port for the messages. This will increase the throughput a lot.

But the problem here is service discovery. How does a project know where to look for messages. What endpoint, what port?

Fedmsg keeps a raw text file of producers and their endpoints and ports.

The event topics follow a certain rule:

`org.fedoraproject.ENV.CATEGORY.OBJECT[.SUBOBJECT].EVENT`

This is the topic rule, apart from this, the body of the message also follows certain rules.

Bus topology

Fedmsg bus

(the bus) is an NxM connection mesh – where each consumer can connect to the producer it wants messages from. If you want to get messages from everyone, i.e. to get all the messages, subscribe to every consumer.

datagrepper

In production, datagrepper depends on datanommer which takes in all the messages in fedmsg and puts them in a postgres DB. This makes the life of datagrepper very easy. It is just a flask app that has various endpoints for docs and one endpoint where it sends over the filters to datanommer and parses and returns the result.

    try:
        # This fancy classmethod does all of our search for us.
        total, pages, messages = dm.Message.grep(
            start=start and datetime.fromtimestamp(start),
            end=end and datetime.fromtimestamp(end),
            page=page,
            rows_per_page=rows_per_page,
            order=order,
            users=users,
            packages=packages,
            categories=categories,
            topics=topics,
            contains=contains,
            not_users=not_users,
            not_packages=not_packages,
            not_categories=not_categories,
            not_topics=not_topics,
        )

Looks like I will have to look at datanommer to get a feel for how to subscribe to all the topics of the fedmsg.

datanommer

“A message sink for fedmsg”

It is comprised of only a fedmsg consumer that stuffs every message in a sqlalchemy database. There are also a handful of CLI tools to dump information from the database.

There are 3 components.

datanommer.commands

  • datanommer-create-db
  • datanommer-dump
  • datanommer-stats
  • datanommer-latest

Basically commands to interact with the database with ORM

datanommer.consumer

Just subscribes to all the topics

class Nommer(fedmsg.consumers.FedmsgConsumer):
    topic = "*"
    config_key = 'datanommer.enabled'

Dependencies:

  • fedmsg-relay - relay connections from active loggers to the bus

    it is a service which binds to 2 ports, listens for messages on one and emits them on the other fedmsg-logger requires that an instances of the relay be running somewhere

  • fedmsg-hub - all-purpose daemon

    listen to every endpoint discovered by fedmsg.config and forward the messages to locally-declared consumers.

  • fedmsg-logger - emits the log messages to the fedbus

    def _log_message(self, kw, message):
        if kw['json_input']:
            msg = fedmsg.encoding.loads(message)
        else:
            msg = {'log': message}

        fedmsg.publish(
            topic=kw['topic'],
            msg=msg,
            modname=kw['modname'],
        )

datanommer.models

This package contains the SQLAlchemy data model for datanommer.

class BaseMessage(object):
    id = Column(Integer, primary_key=True)
    msg_id = Column(UnicodeText, nullable=True, unique=True, default=None, index=True)
    i = Column(Integer, nullable=False)
    topic = Column(UnicodeText, nullable=False)
    timestamp = Column(DateTime, nullable=False, index=True)
    certificate = Column(UnicodeText)
    signature = Column(UnicodeText)
    category = Column(UnicodeText, nullable=False)
    username = Column(UnicodeText)
    crypto = Column(UnicodeText)
    source_name = Column(UnicodeText, default=u"datanommer")
    source_version = Column(UnicodeText, default=source_version_default)
    _msg = Column(UnicodeText, nullable=False)
    _headers = Column(UnicodeText)

fedmsg.d

One interesting thing is that each of the projects has the fedmsg.d dir which where fedmsg.config will try to read the config from whenever the fedmsg API is invoked. So, for consuming/producing from/to fedmsg, you will need the dir in the project root.

The ./fedmsg.d/ is the local settings. /etc/fedmsg.d/ is the global one.