Hi Community,
In the first part of this series, we examined the fundamentals of Interoperability on Python (IoP), specifically how it enables us to construct such interoperability elements as business services, processes, and operations using pure Python.
Now, we are ready to take things a step further. Real-world integration scenarios extend beyond simple message handoffs.They involve scheduled polling, custom message structures, decision logic, filtering, and configuration handling.In this article, we will delve into these more advanced IoP capabilities and demonstrate how to create and run a more complex interoperability flow using only Python.
To make it practical, we will build a comprehensive example: The Reddit Post Analyzer Production. The concept is straightforward: continuously retrieving the latest submissions from a chosen subreddit, filtering them based on popularity, adding extra tags to them, and sending them off for storage or further analysis.
The ultimate goal here is a reliable, self-running data ingestion pipeline. All major parts (the Business Service, Business Process, and Business Operation) are implemented in Python, showcasing how to use IoP as a Python-first integration methodology.
We will cover the following topics in this article:
✅ Defining message models using @dataclass
✅ Pulling live data from Reddit (Business Service)
✅ Filtering and enriching posts (Business Process)
✅ Handling the final delivery (Business Operation)
✅ Using structured logging across the pipeline
✅ Migrating IOP classes into IRIS using settings.py
✅ Overview of the IOP Director utility class
Let's begin with the application folder structure:
reddit_iop/
├─ messages.py
├─ services/
└─ service_reddit.py
├─ processes/
└─ process_reddit.py
├─ operations/
└─ operation_store.py
├─ settings.py
✅ Defining Message Models Using @dataclass (messages.py)
A central concept in any integration framework is the Message.In InterSystems IRIS, messages are first-class objects (they can be traced, inspected, and persisted as they move through production). One of the strengths of IoP is that we can define these messages as typed Python classes using @dataclass. It means that we can avoid creating ObjectScript message classes and instead benefit from clean, IDE-friendly Python models.
In IoP, Message is the base class for anything passed between components. We will build upon it to create our own strongly-typed message objects with the help of Python dataclasses. These data models will flow through the Business Service, Business Process, and Business Operation.
from iop import Message
from dataclasses import dataclass
@dataclass
class RedditPostMessage(Message):
Title: str = ""
URL: str = ""
Author: str = ""
Score: int = 0
Tag: str = ""
Status: str = ""
By using the @dataclass decorator on a class that inherits from iop.Message, we achieve several advanced benefits with minimal code:
- Automatic Properties:
dataclasses automatically generates the __init__, __repr__, and comparison methods based on the type-hinted fields (Title: str, Score: int, etc.).
- Strong Typing: Type hints ensure that all components are aware of the expected data type, which improves code quality and prevents runtime errors.
- IoP Integration: The
iop.Message inheritance ensures that the Python class is compiled into a persistent, ObjectScript-compatible class within InterSystems IRIS. It means that every message sent is automatically saved in the database for auditing and visual tracing (a key feature of the IRIS platform).
✅ Pulling Live Data from Reddit (service_reddit.py)
In an interoperability production, the Business Service acts as the gateway that brings data into the system. For our demonstration, the service will continuously poll Reddit’s /new.json endpoint and feed new submissions into the processing pipeline.
This component uses an inbound adapter to schedule and execute periodic API calls. Each time the adapter runs, it requests the latest posts from the specified subreddit, wraps the relevant fields in our RedditPostMessage dataclass, and forwards it to the next stage in the flow.
Key responsibilities of this service include:
- Initiating the data flow at defined intervals
- Connecting to Reddit and retrieving the newest submissions
- Converting raw API response data into a strongly-typed
RedditPostMessage
- Logging errors cleanly without interrupting the production
- Forwarding well-structured messages to the Business Process layer
This configuration mirrors a real-world integration pattern where an external data source continuously feeds the integration engine. By combining the IoP inbound adapter with a Python-based message model, we achieve a reliable and traceable ingest layer that is independent of ObjectScript.
from iop import BusinessService
from messages import RedditPostMessage
import requests, time
class RedditService(BusinessService):
def get_adapter_type():
return "Ens.InboundAdapter"
def on_init(self):
self.subreddit = "technology"
self.poll_interval = 10
self.base_url = f"https://www.reddit.com/r/{self.subreddit}/new.json?limit=5"
self.headers = {"User-Agent": "IRIS-IoP-Reddit-Agent"}
def on_process_input(self, _):
while True:
try:
response = requests.get(self.base_url, headers=self.headers)
posts = response.json()["data"]["children"]
for post in posts:
data = post["data"]
msg = RedditPostMessage(
Title=data["title"],
URL="https://reddit.com" + data["permalink"],
Author=data["author"],
Score=data["score"]
)
self.send_request_sync("RedditProcess", msg)
self.log_info(f"[RedditService] Pulled {len(posts)} posts")
except Exception as e:
self.log_error(f"[RedditService ERROR] {e}")
time.sleep(self.poll_interval)
✅ Filtering and Enriching Posts (process_reddit.py)
The Business Process acts as the central nervous system of the production. This is where raw Reddit submissions are converted into meaningful information, and such key business rules as filtering, decision-making, and routing are executed.
Once the Business Service publishes a RedditPostMessage, the process assesses its contents and determines the next course of action.
In this example, the process checks whether the submission meets specific criteria (e.g., a minimum score or specific keywords).Posts that pass the filter are augmented and forwarded toward our Business Operation, while those that don’t are logged and dropped to keep the workflow clean and efficient.
from iop import BusinessProcess
from messages import RedditPostMessage
class RedditProcess(BusinessProcess):
def on_init(self):
self.log_info("Hello World init")
def on_request(self, request: RedditPostMessage) -> RedditPostMessage:
title = request.Title
score = request.Score
self.log_info(f"[Process] Received: {title} | Score: {score}")
min_score = 5
if score < min_score:
self.log_info(f"[Process] Skipped low score ({score}) post")
response = RedditPostMessage(Status="FilteredLowScore")
return response
request.Tag = self._tag_topic(title)
self.log_info(f"[Process] Tagged topic: {request.Tag}")
return self.send_request_sync("RedditStoreOperation", request)
def _tag_topic(self, title: str) -> str:
keywords = {
"AI": "Artificial Intelligence",
"health": "Healthcare",
"python": "Programming",
"data": "Data Engineering",
}
for key, tag in keywords.items():
if key.lower() in title.lower():
return tag
return "General"
- Filtering and Early Exit: The
if score < min_score: block demonstrates conditional processing. If the message does not meet the requirements (low score), the process logs the skip and returns a simple StatusResponse, terminating that message's journey early without sending it downstream.
- Data Enrichment: The line
request.Tag = self._tag_topic(title) shows how to modify the message object (which is a Python object in memory). The _tag_topic function performs simple business logic (categorization) and adds the result to the message, making the data more valuable for the storage component.
- Internal Methods: Python enables clean object-oriented design, as demonstrated by _tag_topic. This function, encapsulated within the class, keeps the main
on_request method clean and focused on orchestration.
- Continuing the Pipeline: If the post passes the filter, the augmented message is passed to the Operation using
self.send_request_sync(), ensuring the flow remains synchronous for full traceability in the visual message tracer.
✅ Handling the Final Delivery (operation_store.py)
The Business Operation is the final component in the production pipeline that interacts with external systems. This could be a database, a file system, a remote API, a message queue, or any other destination for processed data.
Once a message reaches this layer, it is considered fully processed and ready for persistence, storage, or further consumption. In our demonstration, the operation logs the post details and simulates saving them. Still, in a real-world scenario, this is where you would execute SQL inserts, REST calls, or send messages to other systems.
from iop import BusinessOperation
from dataclasses import dataclass
from messages import RedditPostMessage
class RedditStoreOperation(BusinessOperation):
def on_init(self):
self.log_info("Operation init")
def on_message(self, request: RedditPostMessage) -> RedditPostMessage:
self.log_info(
f"[Store] Title: {request.Title} | Score: {request.Score} | Tag: {request.Tag}"
)
response = RedditPostMessage(Status="Saved")
return response
- Input Handling: The method signature
on_message(self, request: RedditPostMessage) clearly defines the expected input type, reinforcing the contract set by the custom message.
- External Integration Point: This is the most crucial architectural point. All Python packages, including requests, numpy, pandas, and such specialized connectors as pyodbc or boto3, are available here. The developer is free to use the entire Python ecosystem to interact with any external system.
- Returning Status: The operation successfully executes its task (mocked as logging) and returns a
StatusResponse to the calling Process. Since the Service called the Process synchronously, this final status can be traced all the way back to the Service's on_process_input method, confirming the end-to-end completion.
✅ Using Structured Logging Across the Pipeline
The IoP framework includes its own logging system, and the Python API provides a way to leverage Python’s logging capabilities while fully integrating with IRIS logs.
Every IoP component inherits the logging functionality from its base class. You can access it directly via the logger property or use the built-in convenience methods, such as log_info(), log_warn(), and log_error(), to record messages at the appropriate level.
def on_init(self):
self.log_info("Component initialized")
self.log_error("An error occurred")
self.log_warning("Warning message")
self.log_alert("Critical alert")
self.trace("Debug trace message")
self.logger.info("Info via logger")
self.logger.error("Error via logger")
✅ Migrating IOP Classes into IRIS Usingsettings.py
This is the “glue” that links your Python classes to production items in IRIS. The IoP framework utilizes the settings.py file to define and apply configuration details, which are then reflected directly in the InterSystems Management Portal.
from services.service_reddit import RedditService
from processes.process_reddit import RedditProcess
from operations.operation_store import RedditStoreOperation
CLASSES = {
"Reddit.Ingestion.Service": RedditService,
"Reddit.Ingestion.Process": RedditProcess,
"Reddit.Ingestion.Store": RedditStoreOperation
}
PRODUCTIONS = [
{
"Reddit.Ingestion.Production": {
"@TestingEnabled": "false",
"Item": [
{
"@Name": "RedditService",
"@ClassName": "Reddit.Ingestion.Service",
"@Enabled": "true",
"Setting": [
{
"@Target": "Host",
"@Name": "subreddit",
"#text": "technology"
},
{
"@Target": "Host",
"@Name": "poll_interval",
"#text": "15"
},
]
},
{
"@Name": "RedditProcess",
"@ClassName": "Reddit.Ingestion.Process",
"@Enabled": "true",
"Setting": [
{
"@Target": "Host",
"@Name": "MIN_SCORE",
"#text": "200"
}
]
},
{
"@Name": "RedditStoreOperation",
"@ClassName": "Reddit.Ingestion.Store",
"@Enabled": "true"
}
]
}
}
]
- Dynamic Setting Injection: The
Setting array within the PRODUCTIONS definition is the mechanism for externalizing configuration. When the production is loaded, IRIS reads these values and makes them available to the Python components via the self.get_setting() method or the self.Settings property.
- Live Configuration Change: A significant advantage of using this framework is that administrators can modify the
SUBREDDIT, POLL_INTERVAL, or MIN_SCORE directly in the InterSystems IRIS Management Portal without needing to restart the production. The on_init method will be triggered to re-read these settings, enabling dynamic operational control.
- Clear Structure: The
CLASSES dictionary acts as a mapping layer, simplifying the connection between the ObjectScript-facing Production XML (@ClassName) and the underlying Python implementation. This abstraction is vital for large, multi-language projects.
Using the iop command, we can migrate our Python components directly into IRIS, making them available as production items in the InterSystems environment.
iop --migrate /path/to/reddit_iop/settings.py
✅ Overview of the IOP Director Utility Class
The IOP Director class provides utility methods to manage productions and components in IRIS directly from Python.
Production Management:
start_production(production_name=None) – Starts a production
stop_production() – Stops the current production
restart_production() – Restarts the current production
shutdown_production() – Gracefully shuts down the production
status_production() – Gets current production status (returns a dictionary)
Business Service Management:
create_business_service(target) – Creates a new service instance
get_business_service(target) – Retrieves an existing service instance
test_component(target, message=None, classname=None, body=None) – Tests any production component
Production Logging:
log_production() – Monitors logs in real time
log_production_top(top) – Displays the last N log entries
Production Configuration:
set_default_production(production_name) – Sets the default production
get_default_production() – Gets the current default production name
The Director class makes it easy to control, monitor, and test your IoP productions without leaving the Python environment.
To initiate a production, you can use the start_production() method of the Director class.
.png)
Production Overview
The following production has been created with the help of the iop --migrate command:
.png)
Below you can see the business service details (%classname refers to the class name in our service_reddit.py file, %module with the help of the Python file name, and %classpaths contains the path to the Python file)
.png)
To view the messages, click on the Business Service, then navigate to the Message tab.
.png)
Click on a message to view its visual trace.
.png)
Messages are received by RedditService, forwarded to RedditProcess, and then, based on the process logic, sent to RedditStoreOperation.
Conclusion
With Interoperability on Python, you can now do the following:
- Build complete production pipelines entirely in Python
- Leverage such modern tooling as dataclasses, type hints, and IDE support.
- Integrate with virtually any API (Reddit, Twitter, FHIR, and more)
- Deploy Python components alongside ObjectScript components
It provides a solid foundation for creating real-time data pipelines in such domains as healthcare, finance, IoT, and social media (all powered by Python within InterSystems IRIS).
The Reddit Post Analyzer Production serves as a blueprint for advanced IoP development. By utilizing custom dataclass messages, implementing robust polling services, applying conditional logic and enrichment within the Business Process, and externalizing configuration through settings.py, we have revealed how Python can evolve from a utility language into a core pillar of a high-performance enterprise integration platform.
Thanks!