Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blueprint for a message broker bridge NApp #457

Open
viniarck opened this issue Feb 19, 2024 · 6 comments · Fixed by atlanticwave-sdx/sdx-oxp-integrator#6
Open

blueprint for a message broker bridge NApp #457

viniarck opened this issue Feb 19, 2024 · 6 comments · Fixed by atlanticwave-sdx/sdx-oxp-integrator#6
Assignees
Labels
future_release Planned for the next release

Comments

@viniarck
Copy link
Member

There are use cases where a NApp subscribes from kytos event handlers and publish them to an external message broker. This is for potentially integrating with other platforms that already uses a message broker and/or new external applications.

Use cases will be further refined and discussed (future BAPM integration, SDX integration, and so on), but the idea is just a bare minimum message broker bridge NApp that can forward accordingly to a supported message broker.

Note: One day Kytos-ng event bus might also natively support other message brokers, but strategically it was always part of the to have a lightweight simplified event bus as we currently have in Python land without external dependencies.

@viniarck viniarck added the future_release Planned for the next release label Feb 19, 2024
@viniarck
Copy link
Member Author

Potentially the message broker will be RabbitMQ. This will be double confirmed as it gets prioritized.

@viniarck viniarck removed the future_release Planned for the next release label Feb 27, 2024
@lmarinve
Copy link

lmarinve commented Mar 13, 2024

:EP: 38
:Title: Broadcasting events
:Authors:
- Luis;
- Italo;
- Vinicius;
- Jeronimo Bezerra [email protected];
:Issued Date: to be defined
:Status: Pending
:Type: Standards Track


EP03X - Broadcasting events


########
Abstract
########

This blueprint details how Kytos should export/broadcast its events
to external applications in a safe and scalable way.

##########
Motivation
##########

With Kytos running in production, we learned that Kytos events can be
leveraged for multiple purposes, from capacity planning to topology evaluation,
from troubleshooting to accounting. However, creating Kytos napps to only monitor the environment should be avoided to minimize possible impacts on running the kytos instance. Moreover, this napp has the potential to expedite the prototyping of new applications and frameworks that need an understanding of the
network topology and control plane, such as the AtlanticWave-SDX project.

##########
Requirements
##########

This napp should not be named after or delegate part of its functions to the message broker system
This napp has to be an event broadcaster only. No input via message_bus should be supported/allowed.
This napp has to support multiple queues: one queue per napp and a queue for all napps (verbose_queue)
This napp is optional, meaning if there is any issue with it, kytos shouldn’t be impacted
This napp must use async.io for all asynchronous calls, for instance, alister_to instead of listen_to
This napp has to keep consistency after reloading (re-read settings.py again, close and open rabbitmq sessions, with tests for these functionalities)
This napp must support filtering specific REGEX messages via settings.py (a list of regex)

The Broadcasting Events Napp (application) must be able to utilize any replaceable message transportation system. This requirement underscores the necessity for flexibility and adaptability in the system's architecture. Consequently, the Broadcasting Events Napp and the Events Consumer Napp should be isolated into microservices napps, distinct from the Message Queue Producer and Consumer Napps. This architectural decision offers several benefits:

########
Modularity and Scalability
########

Microservices allow for modular design, enabling each component to be developed, deployed, and scaled independently. This modularity facilitates easier management and scaling of the system as a whole.
Technology Agnosticism:

By isolating the Broadcasting Events Napp and the Events Consumer Napp, the system becomes agnostic to the specific message transportation system. This means that the choice of a message queue or transportation system can be easily swapped out or upgraded without affecting the core functionality of the microservices.

##########################
Ease of Maintenance and Updates
##########################

Separating the microservices reduces the complexity of each component, making maintenance and updates more straightforward. Changes to one microservice can be made without impacting the others, allowing for faster iteration and deployment cycles.

######################
Resilience and Fault Isolation
######################

Isolating microservices enhances fault isolation. If one microservice experiences issues or failures, they are less likely to impact the overall system's performance, ensuring greater resilience and stability of the application.
Scalability and Performance Optimization:

Microservices architecture facilitates resource optimization by allowing individual components to be scaled independently based on demand. This scalability ensures optimal performance even during peak loads.
Enhanced Security:

Isolating components into microservices can enhance security by reducing the attack surface area. Each microservice can have its security measures, such as access controls and authentication mechanisms, ensuring that security breaches are contained and mitigated effectively.

The message queue operates by receiving messages from the events originating application and distributing them to one or more recipient applications in a first-in-first-out (FIFO) manner. This architecture facilitates communication between different parts of a system without direct coupling.

Events Napp can establish separate message queues for SDX Napp and BAPM Napp to disseminate updates or commands. Events Napp would then dispatch distinct messages to each queue, and the relevant applications would retrieve messages from their designated queues.

Upon retrieval, the system removes messages from the queue to ensure each message is processed exactly once.

Message queues decouple components within the system. Events Napp can transmit updates without the continuous availability of SDX Napp or BAPM applications. Moreover, the persistent nature of message queues ensures that if any application experiences a restart, it can seamlessly resume processing messages from its designated queue once it is back online.

This approach enhances scalability and fault tolerance in applications by reducing dependencies between interconnected systems. Additionally, it facilitates better handling of system failures or temporary unavailability, thereby strengthening the overall robustness of the architecture.

AMQP 0.9.1, a highly efficient and versatile protocol, empowers RabbitMQ, a widely acclaimed message broker, to communicate seamlessly across various systems. This robust combination ensures reliable communication between different components of a distributed system.

aio-pika is an asynchronous AMQP client library designed for Python applications. It enables asynchronous and efficient interaction with RabbitMQ, making it well-suited for high-performance applications or systems that require non-blocking I/O operations.

By leveraging the power of aio-pika, we can seamlessly integrate RabbitMQ's robust messaging capabilities into our Kytos Napps. This potent combination of RabbitMQ's strength and aio-pika's asynchronous nature inspires the creation of scalable and responsive distributed systems, fueling our projects' potential.

#######
Benefits
#######

Real-time monitoring of interdomain link status.

Scalable solution with dynamically created queues.

Fault-tolerant design ensures persistent handling of link-down events.

Flexibility in queue management, allowing for dynamic addition or configuration based on settings.

Overall, this use case demonstrates how message queues can be effectively utilized for monitoring and managing interdomain link status changes in an SDX network infrastructure, ensuring timely detection and response to connectivity issues.

In conclusion, isolating the Broadcasting Events Napp and the Events Consumer Napp into separate microservices from the Message Queue Producer and Consumer Napps promotes flexibility, modularity, scalability, and resilience in the system's architecture. This design approach enables the system to adapt to changing requirements and technologies while maintaining robustness and efficiency in event broadcasting and consumption.

#####################################
Use Case: Interdomain Link Up/Down Monitoring
#####################################

Scenario:

In SDX, monitoring the status of interdomain links for connectivity and reliability is crucial. This use case involves setting up a system to detect link status changes (up/down) and notify consumers about these events through message queues.

Components:

Producer: Generates events based on link status changes.

Consumer: Monitors link status by consuming messages from the appropriate queues.

Implementation:

Link Status Queue Initialization:

Each interdomain link has its dedicated queue.

Queues are either dynamically created or configured based on settings.

Producer Functionality:

Upon detecting a link-down event:

If the link-down event is persistent:
The producer checks if the corresponding queue is empty.

If empty, it adds a new message "[Link Down]" to the unique element link status queue.

If the link-down event is not persistent:
The producer keeps checking until the consumer reads and deletes the message.

Upon detecting a link-up event:
The producer sends a message "[Link Up]" to override the existing message in the link status queue.

Consumer Functionality:

Consumers must be aware of the queues they need to monitor for link status changes.

Consumers continuously monitor the designated queues for new messages indicating link status changes.

Upon receiving a message:

If the message is "[Link Down]," the consumer processes the link down event.

If the message is "[Link Up]", the consumer processes the link-up event.

@italovalcy
Copy link

Hi everybody, it seems like PR #465 didnt make it, but I see many important comments and discussions there! very nice.

I'm also CCing @Auwate for his comments here! Austin has started working on a prototype for integrating Kytos and Kafka recently and I suggested him reading the blueprint proposal Luis started a while ago. After reading the blueprint proposal, Austin has some comments. Austin, please the floor is yours!

@Auwate
Copy link

Auwate commented Nov 8, 2024

Hey everybody, good afternoon. I've worked on prototypes with Kytos and Kafka, focusing on areas such as serialization, SSL/TLS transmission, and Kafka deployments. Here's the overall analysis:

Strengths

  • Modular: Using two different NApps to facilitate message brokering allows the development of both to be streamlined without negatively affecting the other
  • Scalable: Switching RabbitMQ out for Kafka allows Kafka's horizontal scalability to shine, especially for peak data throughput
  • Fault-tolerant: Using two NApps allows one to go offline without negatively affecting the other, as it waits for the NApp to come online before continuing data brokering.

There's a lot more strengths to talk about, as it's a very well structured application. However, my analysis is more on optimizing what's available and highlighting potential bottlenecks and weaknesses:

Analysis summary

The role of mq_producer is somewhat abstract, but assuming listen_events directly sends events to Kafka, the application's event path looks like this:

  • 1: Event is created in Kytos
  • 2: Event is consumed, filtered, serialized, and pushed to Kafka
  • 3: Operational tasks are handled by mq_producer, which includes operating topics, increasing/decreasing partitions, etc.
  • 4: Kafka load balances partitions onto different instances, as well as other internal processes
  • 5: Consumer applications poll from Kafka, handling retries and other necessities.

This is a solid approach, and provides a scalable way to stream events to consumer applications. However, there are some important tradeoffs to consider:

  • 1: Consumer NApps would have to install and use a Kafka library like kafka-python-ng or aiokafka.

Because they are polling directly from Kafka, they would need to have the necessary installations to do so which may increase operational complexity (such as specifying the version to install, issues against Python versions, etc.).

A separate issue is standardizing the way consumer applications poll data. Now there are several ways we can approach this, starting with direct communication between Kafka and consumers. This is likely the most scalable way, as we can increase the amount of Kafka instances in the cluster to support increase demand. However, it would also require us to provide a standardized way of communicating with Kafka. This can be solved with providing a library we send to PyPI, which individuals install and use as a object or function. In addition, direct communication between a Kafka cluster and consumer may introduce security concerns if an application can just connect and receive data.

The second way we could do this is through another NAPP that acts as a proxy, sending data to consumers via their REST API. This continues the format of Kytos's design of a REST API, following a simple and straightforward approach. In addition, we could optimize it for network throughput via compression (like gzip) and possibly increase scalability. This would require much more development as it's an in-house option, but it could be worth it if security and ease-of-use from consumers are important factors to the overall architecture. For an example of what I'm talking about, here's a link to Walmart's MPS proxy system: https://blog.bytebytego.com/p/the-trillion-message-kafka-setup

  • 2: listen_events may need careful consideration on it's development

Depending on whether or not we want high throughput of events, focusing on making the listen_events loosely coupled between components is highly desirable. Specifically, the way it currently works is the listen_events NApp listens for events, filters them, serializes the message, and sends it to Kafka. If the idea is to keep it to a single NApp, then making sure each portion is multithreaded (consuming event, filter events, and serializing events) with asynchronous messaging is vital.

Looking at filtering, it's likely the easiest of the 3 to implement. This is because a set could be created that filters based on what's inside the set.

Next, looking next at sending data, this will likely need to be developed carefully. Specifically, Kafka works best with batched messages, but in order to get acknowledgements there may be some work involved depending on the framework. kafka-python-ng is a traditional synchronous library so using async here might not be a good idea. Further research into aiokafka might be a good idea, although it is a less-used framework.

  • 3: listen_events may need careful consideration on serialization

A massive bottleneck to listen_events will likely be serialization. With this, possible serialization methods include Protobuf, Avro, or MessagePack. MessagePack looks very promising as it's simple like JSON but encodes to binary instead of strings. In addition, it's schema-less, just like JSON which makes updating the serializer for new classes much easier

  • 4: The Kafka cluster would likely need to be deployed on a platform like Docker-Compose

A major point of the application is that it should be able to temporarily store data if a NApp goes down. However, if the Kafka instance goes down itself, then it would need to come back up and continue sending data. Something like Docker-Compose works great with specifying restarts and attaching volumes when necessary, while also being much more lightweight than something like Kubernetes.


Additional comments:

I understand that the NApp would need to be written in Python to connect with Kytos as a NApp. However, if possible, I believe Java is a really good option with connecting to Kafka. It has access to the Kafka Streams API, which creates fast, readable code that can do things like filtering with ease and exactly-once semantics. Here's the link to more information (It's based on Confluent, but the concepts remain the same): https://docs.confluent.io/platform/current/streams/introduction.html

@Auwate
Copy link

Auwate commented Nov 8, 2024

Hello again all, I took a look at the MR that this references and I was looking at addressing some issues regarding the blueprint:

1: Why Kafka

In the above comment I assumed that we would be using Kafka (specifically Apache Kafka) but I wanted to explain why:

Kafka and RabbitMQ (while both message brokers) are fundamentally different and thus provide usage to different needs. Specifically, RabbitMQ (supporting AMQP protocol) is really good at providing complex asynchronous message queues between applications, specifically from it's "exchange" feature. In addition, it provides several features that allow for more complex business logic, such as administrative tools (like a web ui), message priorities, message ordering, etc. However, it's not necessarily built for high-throughput, which is exactly what Kafka is good at. Specifically, Kafka gives up many features given to you in RabbitMQ in exchange for being able to send out millions of messages a second, whereas RabbitMQ (according to https://aws.amazon.com/compare/the-difference-between-rabbitmq-and-kafka/) mentions it throttles at around thousand if on one instance.

2: Serialization

In the above comment I mentioned that serialization would be a major concern for the application. A serializing algorithm like json.dumps() would cause massive bottlenecks if throughput went too high, which led me to investigate various other algorithms.

A promising serializing algorithm was MessagePack, which has a similar syntax to json serialization. It promises increased speed and efficiency as it encodes directly to binary rather than to strings, decreasing the overall network traffic and speeding up serialization speed. In addition, deserializing would be just as easy and would occur much faster than JSON. However, there are much faster options like Protobuf, but they would require us to create and compile proto files, which may increase complexity at a moderate tangible benefit. Overall, it's a good option and works great with Kafka.

3: Retries

Vinicius mentioned concerns on what the NApp should do in case of outages where sending data fails. I like the idea of storing the data on the NApp, and sending once the cluster comes back online, but this requires some nuance to make it scalable and maintainable.

A simple option that maintains a level of fault-tolerance would be to add it to a queue that supports size limits. Arguably, if a component goes down for too long, that would be catastrophic and would require manual intervention, so keeping a buffer equal to the buffer size for aiokafka or kafka-python-ng would suffice. This would allow us to immediately send data out to kafka once the application comes back on.

However, if the NApp itself goes down, it may be a good idea to just let the events go unprocessed. This could be amended based on key requirements, but a possible alternative could be a different NApp that listen_events would pull from in case it goes down. However, pushing every incoming event to the disk to prevent data loss would possibly slow down the NApp significantly, which may go against the NApp's objective of high-throughput, real-time streaming.

4: NApp possible design:

Here's a potential idea:

Because the logic is split into 4 separate steps, it may make logical sense to have a thread for each stage. Here's an example:

1: Getting data: A thread is used to continuously run async functions that listen for events. It would then pass any new events through a pipeline/queue to the filter

2: Filtering: A thread is used to await new data from the pipeline/queue and filter it based on preset configurations. It would then move the data into the next pipeline to the serializer

3: Serializing: A thread is used to await new data from the pipeline/queue and run serializing algorithms on it. This would likely be a potential bottleneck, so perhaps increasing efficiency here is optimal. After serializing, it puts it onto the next pipeline/queue to be sent out

4: Publishing data: A thread is used to await new data from the incoming pipeline/queue and send it out to Kafka. I believe having multiple asynchronous functions going at the same time would be a good idea to maximize the amount of data sent and to limit I/O bottlenecks.

@viniarck
Copy link
Member Author

@Auwate, great start. Here's a partial review:

1: Why Kafka

However, it's not necessarily built for high-throughput, which is exactly what Kafka is good at.

Kafka is a solid choice generally, and for high-throughput it has a slightly edge over RabbitMQ when focusing on throughput. But, what is high-throughput for the use case we're going for?

  • a) In a scenario with 300 EVCs converging (which is a scenario not too far away from prod-like scalability at Amight), which tends to have many events being published, it tend to have approximately 1300 epm - events per minute - (or 78k eps), which is a local peak situation, which doesn't typically last for too long.

  • b) In a theorical unbounded scenario, using my machine, with async tasks publishing events as fast as it can, it can generate up to 187k epm (or 11.22M eps).

Scenario b) is very far from any typical NApp that's currently being used in prod. So a message broker that has a quick sustained throughput around 250k eps with an average size for current usage it should operate relatively well without queuing too much. Which, brings me back to the initial question, what's high-throughput for the intended supported cases are we talking about a) or b)? and also, equally important, how will it be used, consumed by which other applications? Kafka certainly can bridge the events with the case b), but at what cost, how many nodes, storage and SSD would be needed for that kind of on-premises setup? Kafka is a bit infamous for having high OpEx.

Do we to be ready for scenario b) out of the gate? This probably won't impact on the NApp since it'll be written with performance in mind, but in terms of infrastructure if we're aiming for high-throughput which kind of infrastructure needs to exist in place? and will AmLight be able to afford it if not right away but in the future? Let's also highlight overall costs to have a initial Kafka cluster capable of sustaining 500k average sized messages per second (double the amount of the current peak scenario for a typical AmLight topology) without too much delay. How many servers, CPUs and SSDs and NICs would be needed for both this case and for case b)? Scaling out on-premises is harder than in the cloud, but if we're aiming for high-throughput then we should also be able to have an infrastructure to support it, otherwise it's picking a bazooka to kill a fly. So, let's exercise some scenarios here, the beauty of it is that it can scale gradually out, but to achieve the highest throughput how expensive would it be to maintain it? If the Operations team have the resources and is ready to maintain a Kafka cluster let's go for it.

In the future, Kafka also has the potential to be used internally on Kytos-ng core.

2: Serialization

A serializing algorithm like json.dumps() would cause massive bottlenecks if throughput went too high, which led me to investigate various other algorithms. A promising serializing algorithm was MessagePack

Many event contents currently have Python objects, they haven't been standardized to always be have a std format, it was free due to the guarantee that any consumer is still consuming in Python land in the same runtime. So, the first step is to make sure the format can be easily consumed, which brings me again to use case questions:

  • Which kind of consumers we'll we have, will it still be only Python or any other langs? If any other langs, this NApp here would also need to standardize and encode the objects accordingly since changing the original events would require too much refactoring.

MessagePack is a good choice, it can be harder do debug encoding-wise and decoding-wise, but it can be worth it to save some bytes to gain a bit more throughput on wire, and that's pivotal for the hot path. But, let's also measure if with broker supported compression algorithms if MessagePack is still needed, otherwise we can potentially try to stick with JSON with a lib that uses c-extensions + message broker compression, debugging this would be easier. So, let's measure both and make a decision.

3: Retries

However, if the NApp itself goes down, it may be a good idea to just let the events go unprocessed

It's fair to only have retries and do nothing or discard if the broker isn't available, and often times the lib of the message broker provides this funcionality. But, again, goes back to the use case question:

  • For the current consumers and how it'll be used, how critical is to potentially lose some messages?

If the application doesn't have any other fallback, which is fair, then we should make it clear that if consumers can't afford to lose then it's on network operators to ensure that the connectivity latency to the cluster will be low and close and will have many nodes which will minimize greatly the chance of full outage. That way it's crystal clear that for whoever will use the NApp, they'll understand the price infrastructure-wise to pay depending on the criticality of losing an event or not.

This can also be covered later if such extra guarantee is needed, but either way, needs to be clear from the start which kind of guarantees it provides. At the end of the day, locally you can only do so much, so it also won't completely save it indefinitely, but can temporarily buffer things together while the broker is out.

4: NApp possible design:

Because the logic is split into 4 separate steps, it may make logical sense to have a thread for each stage. Here's an example:

Newer kytos-ng org NApps, specially like this one that's IO-based are recommended to be entirely based on asyncio not only for being more light weight, but also to avoid out of order messages when subscribing to Kytos events. You can implement it all in this async context handler, it'll run in a task, and then for subsequent IO, you gotta make sure it's also asyncio compatible. For serialization if it's heavy you might need a custom threadpool, but I'd be surprised if for a typical payload you'd need it, this handler here it would result in a concurrent task being executed in the event loop:

    def setup(self):
        # bootstrap producer connection or let it crash

    def shutdown(self):
        # stop producer

    @alisten_to(".*")
    async def handle_events(self, event: KytosEvent):
        # filter
        # potentially reconnect or autoreconnect
        # await serialize
        # await publish

Overall, this is expected to suffice, from here you can also consider bootstraping new producers in different processes if ever needed if you start to hit CPU issues, but since this is supposed to be IO-bound I'd be surprised if that's ever needed for this case.

5 configuration.

How would it be configured? and which if any APIs would it expose? Reading this as it is, if I were to have a Kafka consumer for example, I wouldn't have an idea how I'd be able to configure certain start streaming certain events to a given topic and partition (if needed).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
future_release Planned for the next release
Projects
None yet
4 participants