Skip to content

Developer Guide

Architecture Overview

RxFlow Pharmacy Assistant is built with a modular, scalable architecture designed for healthcare applications. The system follows clean architecture principles with clear separation of concerns.

graph TB
    subgraph "Presentation Layer"
        A[Streamlit UI]
        B[CLI Interface]
        C[API Endpoints]
    end

    subgraph "Application Layer"
        D[Conversation Manager]
        E[Workflow Engine]
        F[State Machine]
    end

    subgraph "Domain Layer"
        G[Patient Tools]
        H[Pharmacy Tools]
        I[RxNorm Tools]
        J[Escalation Tools]
    end

    subgraph "Infrastructure Layer"
        K[LLM Service]
        L[Mock Data Service]
        M[Logging Service]
        N[Configuration]
    end

    A --> D
    B --> E
    C --> F
    D --> G
    E --> H
    F --> I
    G --> K
    H --> L
    I --> M
    J --> N

Core Components

1. Conversation Management

The conversation system manages AI-powered interactions with multiple conversation types:

# rxflow/workflow/conversation_manager.py
class ConversationManager:
    """Manages different types of conversations and their lifecycles"""

    def __init__(self):
        self.conversation_types = {
            'simple': SimpleConversation,
            'refill': RefillConversation,
            'clinical': ClinicalConversation
        }
        self.active_conversations = {}

    def start_conversation(self, conversation_type: str, 
                         conversation_id: str) -> ConversationState:
        """Initialize a new conversation of specified type"""
        if conversation_type not in self.conversation_types:
            raise ValueError(f"Unknown conversation type: {conversation_type}")

        conversation_class = self.conversation_types[conversation_type]
        conversation = conversation_class(conversation_id)

        self.active_conversations[conversation_id] = conversation
        return conversation.get_state()

Extending Conversation Types

To create a custom conversation type:

from rxflow.workflow.conversation_manager import BaseConversation
from rxflow.workflow.state import ConversationState

class CustomConversation(BaseConversation):
    """Custom conversation for specific use case"""

    def __init__(self, conversation_id: str):
        super().__init__(conversation_id)
        self.conversation_type = "custom"

    def process_message(self, message: str) -> ConversationState:
        """Process user message and return updated state"""
        # Custom processing logic
        response = self._generate_response(message)

        # Update conversation state
        self.state.add_message("user", message)
        self.state.add_message("assistant", response)

        return self.state

    def _generate_response(self, message: str) -> str:
        """Generate appropriate response for custom conversation"""
        # Implement custom response logic
        pass

# Register the new conversation type
from rxflow.workflow.conversation_manager import ConversationManager

ConversationManager.register_conversation_type("custom", CustomConversation)

2. Tool Development

Tools provide the core functionality for pharmacy operations. Each tool follows a consistent pattern:

# Template for new tools
from typing import Dict, Any, Optional
from rxflow.utils.logger import get_logger

logger = get_logger(__name__)

class NewTool:
    """Template for creating new pharmacy tools"""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        self.config = config or {}
        self.logger = logger

    def primary_function(self, query: str) -> Dict[str, Any]:
        """Main function of the tool

        Args:
            query (str): Input query or parameters

        Returns:
            Dict[str, Any]: Structured response with success/error status

        Examples:
            >>> tool = NewTool()
            >>> result = tool.primary_function("sample query")
            >>> print(result['success'])
            True
        """
        try:
            # Validate input
            if not query or not isinstance(query, str):
                return {
                    "success": False,
                    "error": "Invalid query parameter"
                }

            # Process request
            result = self._process_query(query)

            # Log successful operation
            self.logger.info(f"Tool operation successful", extra={
                "tool": self.__class__.__name__,
                "query": query[:100],  # Truncate for privacy
                "success": True
            })

            return {
                "success": True,
                "data": result,
                "timestamp": self._get_timestamp()
            }

        except Exception as e:
            # Log error
            self.logger.error(f"Tool operation failed", extra={
                "tool": self.__class__.__name__,
                "query": query[:100],
                "error": str(e)
            })

            return {
                "success": False,
                "error": f"Tool operation failed: {str(e)}"
            }

    def _process_query(self, query: str) -> Dict[str, Any]:
        """Internal processing logic - implement in subclass"""
        raise NotImplementedError("Subclasses must implement _process_query")

    def _get_timestamp(self) -> str:
        """Get current timestamp in ISO format"""
        from datetime import datetime
        return datetime.utcnow().isoformat() + "Z"

# Safety wrapper function for the tool
def safe_new_tool_operation(query: str) -> Dict[str, Any]:
    """Safe wrapper for NewTool operations

    Args:
        query (str): Input query

    Returns:
        Dict[str, Any]: Tool response with error handling

    Examples:
        >>> result = safe_new_tool_operation("test query")
        >>> if result.get("success"):
        ...     print("Operation successful")
    """
    try:
        tool = NewTool()
        return tool.primary_function(query)
    except Exception as e:
        return {
            "success": False,
            "error": f"Safety wrapper caught error: {str(e)}"
        }

Tool Integration Pattern

Integrate tools with the conversation system:

# rxflow/tools/tool_manager.py
from typing import Dict, Any, List
from .patient_history_tool import PatientHistoryTool, safe_medication_history
from .pharmacy_tools import PharmacyTool, safe_pharmacy_lookup

class ToolManager:
    """Manages available tools and their execution"""

    def __init__(self):
        self.tools = {
            "patient_history": PatientHistoryTool(),
            "pharmacy": PharmacyTool(),
            # Add new tools here
        }

        self.safe_functions = {
            "safe_medication_history": safe_medication_history,
            "safe_pharmacy_lookup": safe_pharmacy_lookup,
            # Add new safe functions here
        }

    def get_available_tools(self) -> List[str]:
        """Get list of available tool names"""
        return list(self.tools.keys())

    def execute_tool(self, tool_name: str, function_name: str, 
                    params: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a specific tool function safely"""

        if tool_name not in self.tools:
            return {
                "success": False,
                "error": f"Unknown tool: {tool_name}"
            }

        tool = self.tools[tool_name]

        if not hasattr(tool, function_name):
            return {
                "success": False,
                "error": f"Tool {tool_name} does not have function {function_name}"
            }

        try:
            function = getattr(tool, function_name)
            result = function(**params)
            return result

        except Exception as e:
            return {
                "success": False,
                "error": f"Tool execution failed: {str(e)}"
            }

    def execute_safe_function(self, function_name: str, 
                            params: Any) -> Dict[str, Any]:
        """Execute a safe wrapper function"""

        if function_name not in self.safe_functions:
            return {
                "success": False,
                "error": f"Unknown safe function: {function_name}"
            }

        try:
            function = self.safe_functions[function_name]
            result = function(params)
            return result

        except Exception as e:
            return {
                "success": False,
                "error": f"Safe function execution failed: {str(e)}"
            }

3. State Management

The state machine manages conversation flow and context:

# rxflow/workflow/state_machine.py
from enum import Enum
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field

class ConversationState(Enum):
    """Possible conversation states"""
    INITIALIZED = "initialized"
    GATHERING_INFO = "gathering_info"
    PROCESSING = "processing"
    AWAITING_CONFIRMATION = "awaiting_confirmation"
    COMPLETED = "completed"
    ERROR = "error"

@dataclass
class ConversationContext:
    """Context data for conversations"""
    conversation_id: str
    user_id: Optional[str] = None
    patient_id: Optional[str] = None
    current_medication: Optional[str] = None
    selected_pharmacy: Optional[str] = None
    collected_data: Dict[str, Any] = field(default_factory=dict)
    conversation_history: List[Dict[str, str]] = field(default_factory=list)

class StateMachine:
    """Manages conversation state transitions"""

    def __init__(self, initial_state: ConversationState = ConversationState.INITIALIZED):
        self.current_state = initial_state
        self.context = ConversationContext("")
        self.state_handlers = {
            ConversationState.INITIALIZED: self._handle_initialized,
            ConversationState.GATHERING_INFO: self._handle_gathering_info,
            ConversationState.PROCESSING: self._handle_processing,
            ConversationState.AWAITING_CONFIRMATION: self._handle_awaiting_confirmation,
            ConversationState.COMPLETED: self._handle_completed,
            ConversationState.ERROR: self._handle_error,
        }

    def transition_to(self, new_state: ConversationState, 
                     context_updates: Optional[Dict[str, Any]] = None) -> bool:
        """Transition to a new state with optional context updates"""

        if not self._is_valid_transition(self.current_state, new_state):
            return False

        # Update context if provided
        if context_updates:
            for key, value in context_updates.items():
                if hasattr(self.context, key):
                    setattr(self.context, key, value)
                else:
                    self.context.collected_data[key] = value

        # Perform transition
        old_state = self.current_state
        self.current_state = new_state

        # Log state transition
        logger.info("State transition", extra={
            "conversation_id": self.context.conversation_id,
            "from_state": old_state.value,
            "to_state": new_state.value,
            "context_updates": list(context_updates.keys()) if context_updates else []
        })

        return True

    def process_input(self, user_input: str) -> Dict[str, Any]:
        """Process user input based on current state"""

        if self.current_state not in self.state_handlers:
            return {
                "success": False,
                "error": f"No handler for state: {self.current_state}"
            }

        handler = self.state_handlers[self.current_state]
        return handler(user_input)

    def _is_valid_transition(self, from_state: ConversationState, 
                           to_state: ConversationState) -> bool:
        """Check if state transition is valid"""
        valid_transitions = {
            ConversationState.INITIALIZED: [
                ConversationState.GATHERING_INFO,
                ConversationState.ERROR
            ],
            ConversationState.GATHERING_INFO: [
                ConversationState.PROCESSING,
                ConversationState.GATHERING_INFO,  # Stay in same state
                ConversationState.ERROR
            ],
            ConversationState.PROCESSING: [
                ConversationState.AWAITING_CONFIRMATION,
                ConversationState.COMPLETED,
                ConversationState.ERROR
            ],
            ConversationState.AWAITING_CONFIRMATION: [
                ConversationState.PROCESSING,  # User wants changes
                ConversationState.COMPLETED,
                ConversationState.ERROR
            ],
            ConversationState.COMPLETED: [
                ConversationState.INITIALIZED  # Start new conversation
            ],
            ConversationState.ERROR: [
                ConversationState.INITIALIZED,  # Restart
                ConversationState.GATHERING_INFO  # Retry
            ]
        }

        return to_state in valid_transitions.get(from_state, [])

Testing Framework

Unit Testing

RxFlow uses pytest for comprehensive testing:

# tests/test_patient_history_tool.py
import pytest
from unittest.mock import Mock, patch
from rxflow.tools.patient_history_tool import PatientHistoryTool, safe_medication_history

class TestPatientHistoryTool:
    """Test cases for PatientHistoryTool"""

    def setup_method(self):
        """Setup test fixtures"""
        self.tool = PatientHistoryTool()
        self.sample_patient_id = "12345"
        self.sample_medication = "lisinopril"

    def test_get_medication_history_success(self):
        """Test successful medication history retrieval"""
        query = f"{self.sample_patient_id}:{self.sample_medication}"

        with patch.object(self.tool, '_fetch_patient_data') as mock_fetch:
            mock_fetch.return_value = {
                "medications": [
                    {
                        "name": "lisinopril",
                        "strength": "10mg", 
                        "condition": "hypertension",
                        "last_filled": "2025-01-10"
                    }
                ]
            }

            result = self.tool.get_medication_history(query)

            assert result["success"] is True
            assert len(result["medications"]) == 1
            assert result["medications"][0]["name"] == "lisinopril"

    def test_get_medication_history_invalid_query(self):
        """Test handling of invalid query format"""
        invalid_query = "invalid_format"

        result = self.tool.get_medication_history(invalid_query)

        assert result["success"] is False
        assert "Invalid query format" in result["error"]

    def test_safe_medication_history_wrapper(self):
        """Test safety wrapper function"""
        query = f"{self.sample_patient_id}:{self.sample_medication}"

        with patch.object(PatientHistoryTool, 'get_medication_history') as mock_method:
            mock_method.return_value = {"success": True, "medications": []}

            result = safe_medication_history(query)

            assert result["success"] is True
            mock_method.assert_called_once_with(query)

    @pytest.mark.parametrize("adherence_score,expected_level", [
        (95, "Excellent"),
        (85, "Good"),
        (70, "Fair"),
        (50, "Poor"),
        (30, "Very Poor")
    ])
    def test_adherence_level_calculation(self, adherence_score, expected_level):
        """Test adherence level calculation with different scores"""
        query = f"{self.sample_patient_id}:{self.sample_medication}"

        with patch.object(self.tool, '_calculate_adherence_score') as mock_calc:
            mock_calc.return_value = adherence_score

            result = self.tool.check_adherence(query)

            assert result["adherence_level"] == expected_level

# Integration tests
class TestIntegration:
    """Integration tests for tool interactions"""

    def test_complete_refill_workflow(self):
        """Test complete prescription refill workflow"""
        from rxflow.workflow.refill_conversation import RefillConversation
        from rxflow.tools.patient_history_tool import PatientHistoryTool
        from rxflow.tools.pharmacy_tools import PharmacyTool

        # Mock external dependencies
        with patch.object(PatientHistoryTool, 'get_medication_history') as mock_history, \
             patch.object(PatientHistoryTool, 'check_adherence') as mock_adherence, \
             patch.object(PharmacyTool, 'find_nearby_pharmacies') as mock_pharmacies:

            # Setup mocks
            mock_history.return_value = {
                "success": True,
                "medications": [{"name": "lisinopril", "refills_remaining": 2}]
            }
            mock_adherence.return_value = {
                "adherence_score": 85,
                "adherence_level": "Good"
            }
            mock_pharmacies.return_value = {
                "success": True, 
                "pharmacies": [{"id": "CVS001", "name": "CVS Pharmacy"}]
            }

            # Test workflow
            conversation = RefillConversation("test_123")
            result = conversation.process_message(
                "I need to refill my blood pressure medication"
            )

            assert result.current_state != "error"
            assert "lisinopril" in str(result.context.collected_data)

# Performance tests
class TestPerformance:
    """Performance benchmarks for critical operations"""

    def test_medication_lookup_performance(self):
        """Test medication lookup response time"""
        import time

        tool = PatientHistoryTool()
        query = "12345:lisinopril"

        start_time = time.time()
        result = tool.get_medication_history(query)
        end_time = time.time()

        response_time = end_time - start_time

        # Should complete within 2 seconds
        assert response_time < 2.0
        assert result is not None

# Run tests with: pytest tests/ -v --cov=rxflow --cov-report=html

Mocking External Services

For testing without external dependencies:

# tests/conftest.py
import pytest
from unittest.mock import Mock

@pytest.fixture
def mock_llm_service():
    """Mock LLM service for testing"""
    mock = Mock()
    mock.generate_response.return_value = "Mocked LLM response"
    return mock

@pytest.fixture  
def mock_patient_data():
    """Mock patient data for testing"""
    return {
        "12345": {
            "medications": [
                {
                    "name": "lisinopril",
                    "strength": "10mg",
                    "condition": "hypertension", 
                    "last_filled": "2025-01-10",
                    "refills_remaining": 2
                }
            ],
            "allergies": [
                {
                    "allergen": "penicillin",
                    "reaction": "rash",
                    "severity": "Moderate"
                }
            ]
        }
    }

@pytest.fixture
def mock_pharmacy_data():
    """Mock pharmacy data for testing"""
    return {
        "90210": [
            {
                "id": "CVS001",
                "name": "CVS Pharmacy #1234",
                "address": "123 Main St",
                "distance": 1.2,
                "rating": 4.5
            }
        ]
    }

Configuration Management

Environment-based Configuration

# rxflow/config/settings.py
import os
from typing import Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class DatabaseConfig:
    """Database configuration settings"""
    host: str = "localhost"
    port: int = 5432
    username: str = "rxflow"
    password: str = ""
    database: str = "rxflow_dev"

@dataclass 
class LLMConfig:
    """LLM service configuration"""
    provider: str = "openai"  # openai, anthropic, etc.
    api_key: str = ""
    model: str = "gpt-4"
    temperature: float = 0.1
    max_tokens: int = 1000

@dataclass
class SecurityConfig:
    """Security and encryption settings"""
    secret_key: str = ""
    encryption_key: str = ""
    jwt_expiry_hours: int = 24
    rate_limit_per_minute: int = 60

class Settings:
    """Application settings with environment overrides"""

    def __init__(self, environment: str = "development"):
        self.environment = environment
        self.load_environment_variables()

        # Configuration objects
        self.database = DatabaseConfig(
            host=os.getenv("DB_HOST", "localhost"),
            port=int(os.getenv("DB_PORT", "5432")),
            username=os.getenv("DB_USERNAME", "rxflow"),
            password=os.getenv("DB_PASSWORD", ""),
            database=os.getenv("DB_DATABASE", f"rxflow_{environment}")
        )

        self.llm = LLMConfig(
            provider=os.getenv("LLM_PROVIDER", "openai"),
            api_key=os.getenv("LLM_API_KEY", ""),
            model=os.getenv("LLM_MODEL", "gpt-4"),
            temperature=float(os.getenv("LLM_TEMPERATURE", "0.1")),
            max_tokens=int(os.getenv("LLM_MAX_TOKENS", "1000"))
        )

        self.security = SecurityConfig(
            secret_key=os.getenv("SECRET_KEY", "dev-secret-key"),
            encryption_key=os.getenv("ENCRYPTION_KEY", ""),
            jwt_expiry_hours=int(os.getenv("JWT_EXPIRY_HOURS", "24")),
            rate_limit_per_minute=int(os.getenv("RATE_LIMIT_PER_MINUTE", "60"))
        )

    def load_environment_variables(self):
        """Load environment-specific variables from .env files"""
        env_file = f".env.{self.environment}"

        if os.path.exists(env_file):
            from dotenv import load_dotenv
            load_dotenv(env_file)
        elif os.path.exists(".env"):
            from dotenv import load_dotenv  
            load_dotenv(".env")

    def validate_configuration(self) -> Dict[str, Any]:
        """Validate all configuration settings"""
        validation_errors = []

        # Validate required LLM settings
        if not self.llm.api_key:
            validation_errors.append("LLM_API_KEY is required")

        # Validate security settings for production
        if self.environment == "production":
            if self.security.secret_key == "dev-secret-key":
                validation_errors.append("Production SECRET_KEY must be set")

            if not self.security.encryption_key:
                validation_errors.append("Production ENCRYPTION_KEY is required")

        # Validate database connection
        if self.environment != "test":
            if not self.database.password and self.environment == "production":
                validation_errors.append("Production database password required")

        return {
            "valid": len(validation_errors) == 0,
            "errors": validation_errors
        }

# Usage
settings = Settings(os.getenv("ENVIRONMENT", "development"))
validation = settings.validate_configuration()

if not validation["valid"]:
    raise ValueError(f"Configuration errors: {validation['errors']}")

Feature Flags

Implement feature flags for gradual rollouts:

# rxflow/config/feature_flags.py
from typing import Dict, Any, Optional
import json
import os

class FeatureFlags:
    """Manages feature flags for gradual feature rollouts"""

    def __init__(self, config_file: Optional[str] = None):
        self.config_file = config_file or "feature_flags.json"
        self.flags = self._load_flags()

    def _load_flags(self) -> Dict[str, Any]:
        """Load feature flags from configuration file"""
        default_flags = {
            "enhanced_drug_checking": True,
            "multi_pharmacy_comparison": True,  
            "advanced_adherence_analytics": False,
            "integration_epic_ehr": False,
            "ai_clinical_recommendations": True,
            "real_time_inventory": False,
            "patient_mobile_app": False
        }

        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r') as f:
                    file_flags = json.load(f)
                    default_flags.update(file_flags)
            except Exception as e:
                print(f"Error loading feature flags: {e}")

        return default_flags

    def is_enabled(self, feature_name: str, user_id: Optional[str] = None) -> bool:
        """Check if a feature is enabled for a user"""

        # Check if feature exists
        if feature_name not in self.flags:
            return False

        flag_config = self.flags[feature_name]

        # Simple boolean flag
        if isinstance(flag_config, bool):
            return flag_config

        # Complex flag with rollout percentage or user targeting
        if isinstance(flag_config, dict):

            # Check if feature is globally enabled
            if not flag_config.get("enabled", False):
                return False

            # Check user whitelist
            whitelist = flag_config.get("whitelist_users", [])
            if user_id and user_id in whitelist:
                return True

            # Check rollout percentage
            rollout_percentage = flag_config.get("rollout_percentage", 0)
            if rollout_percentage > 0 and user_id:
                # Simple hash-based percentage rollout
                user_hash = hash(user_id) % 100
                return user_hash < rollout_percentage

            return rollout_percentage == 100

        return False

    def get_flag_value(self, feature_name: str, default_value: Any = None) -> Any:
        """Get the raw value of a feature flag"""
        return self.flags.get(feature_name, default_value)

    def set_flag(self, feature_name: str, value: Any) -> None:
        """Set a feature flag value (for testing)"""
        self.flags[feature_name] = value

# Usage throughout the application
feature_flags = FeatureFlags()

# In conversation processing
if feature_flags.is_enabled("ai_clinical_recommendations", user_id):
    # Enable AI clinical recommendations
    recommendations = generate_clinical_recommendations(patient_data)

# In pharmacy tools
if feature_flags.is_enabled("real_time_inventory", user_id):
    # Use real-time inventory checking
    inventory = check_real_time_inventory(pharmacy_id, medication)
else:
    # Use cached inventory data
    inventory = check_cached_inventory(pharmacy_id, medication)

Deployment

Docker Configuration

# Dockerfile
FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy dependency files
COPY pyproject.toml poetry.lock ./

# Install Poetry and dependencies
RUN pip install poetry && \
    poetry config virtualenvs.create false && \
    poetry install --no-dev

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1001 rxflow && \
    chown -R rxflow:rxflow /app
USER rxflow

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1

# Expose port
EXPOSE 8000

# Start application
CMD ["python", "-m", "rxflow.main"]

Docker Compose for Development

# docker-compose.yml
version: '3.8'

services:
  rxflow-app:
    build: .
    ports:
      - "8000:8000"
      - "8501:8501"  # Streamlit port
    environment:
      - ENVIRONMENT=development
      - LLM_API_KEY=${LLM_API_KEY}
      - DB_HOST=postgres
      - DB_PASSWORD=rxflow_dev_pass
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
    networks:
      - rxflow-network

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=rxflow_dev
      - POSTGRES_USER=rxflow
      - POSTGRES_PASSWORD=rxflow_dev_pass
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - rxflow-network

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - rxflow-network

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - rxflow-app
    networks:
      - rxflow-network

volumes:
  postgres_data:
  redis_data:

networks:
  rxflow-network:
    driver: bridge

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rxflow-app
  labels:
    app: rxflow-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rxflow-app
  template:
    metadata:
      labels:
        app: rxflow-app
    spec:
      containers:
      - name: rxflow-app
        image: rxflow:latest
        ports:
        - containerPort: 8000
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: LLM_API_KEY
          valueFrom:
            secretKeyRef:
              name: rxflow-secrets
              key: llm-api-key
        - name: DB_HOST
          value: "postgres-service"
        - name: DB_PASSWORD
          valueFrom:
            secretKeyRef:
              name: rxflow-secrets
              key: db-password
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"

---
apiVersion: v1
kind: Service
metadata:
  name: rxflow-service
spec:
  selector:
    app: rxflow-app
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Monitoring and Observability

Structured Logging

# rxflow/utils/logger.py
import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any, Optional

class StructuredLogger:
    """Structured logging for better observability"""

    def __init__(self, name: str, level: str = "INFO"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(getattr(logging, level.upper()))

        # Create structured formatter
        formatter = StructuredFormatter()

        # Console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)

        # File handler for production
        if os.getenv("ENVIRONMENT") == "production":
            file_handler = logging.FileHandler("logs/rxflow.log")
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)

    def info(self, message: str, **kwargs):
        """Log info level message with structured data"""
        self.logger.info(message, extra={"structured_data": kwargs})

    def warning(self, message: str, **kwargs):
        """Log warning level message with structured data"""
        self.logger.warning(message, extra={"structured_data": kwargs})

    def error(self, message: str, **kwargs):
        """Log error level message with structured data"""
        self.logger.error(message, extra={"structured_data": kwargs})

class StructuredFormatter(logging.Formatter):
    """Custom formatter for structured logging"""

    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "message": record.getMessage(),
            "logger": record.name,
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }

        # Add structured data if available
        if hasattr(record, "structured_data"):
            log_entry.update(record.structured_data)

        # Add exception info if present
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)

        return json.dumps(log_entry)

# Usage throughout application
logger = StructuredLogger(__name__)

logger.info("Patient medication lookup", 
           patient_id="12345",
           medication="lisinopril", 
           lookup_type="history",
           response_time_ms=150)

Metrics Collection

# rxflow/utils/metrics.py
import time
from functools import wraps
from typing import Dict, Any, Callable
from collections import defaultdict, Counter
from threading import Lock

class MetricsCollector:
    """Collects and tracks application metrics"""

    def __init__(self):
        self.counters = Counter()
        self.histograms = defaultdict(list)
        self.gauges = {}
        self.lock = Lock()

    def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
        """Increment a counter metric"""
        with self.lock:
            metric_name = self._build_metric_name(name, tags)
            self.counters[metric_name] += value

    def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
        """Record a histogram value"""
        with self.lock:
            metric_name = self._build_metric_name(name, tags)
            self.histograms[metric_name].append(value)

    def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """Set a gauge value"""
        with self.lock:
            metric_name = self._build_metric_name(name, tags)
            self.gauges[metric_name] = value

    def _build_metric_name(self, name: str, tags: Dict[str, str] = None) -> str:
        """Build metric name with tags"""
        if not tags:
            return name

        tag_string = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
        return f"{name}[{tag_string}]"

    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get summary of all metrics"""
        with self.lock:
            return {
                "counters": dict(self.counters),
                "histograms": {
                    name: {
                        "count": len(values),
                        "min": min(values) if values else 0,
                        "max": max(values) if values else 0,
                        "avg": sum(values) / len(values) if values else 0
                    }
                    for name, values in self.histograms.items()
                },
                "gauges": dict(self.gauges)
            }

# Global metrics collector
metrics = MetricsCollector()

def track_performance(metric_name: str, tags: Dict[str, str] = None):
    """Decorator to track function performance"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()

            try:
                result = func(*args, **kwargs)

                # Record success
                success_tags = (tags or {}).copy()
                success_tags["status"] = "success"
                metrics.increment_counter(f"{metric_name}.calls", tags=success_tags)

                return result

            except Exception as e:
                # Record error
                error_tags = (tags or {}).copy()
                error_tags["status"] = "error"
                error_tags["error_type"] = type(e).__name__
                metrics.increment_counter(f"{metric_name}.calls", tags=error_tags)
                raise

            finally:
                # Record execution time
                execution_time = time.time() - start_time
                metrics.record_histogram(f"{metric_name}.duration", 
                                       execution_time * 1000, tags=tags)

        return wrapper
    return decorator

# Usage
@track_performance("patient_history_lookup", tags={"tool": "patient_history"})
def get_medication_history(self, query: str) -> Dict[str, Any]:
    # Function implementation
    pass

# Track business metrics
metrics.increment_counter("prescription_refills", tags={"pharmacy": "CVS001"})
metrics.record_histogram("adherence_scores", 85.5, tags={"medication": "lisinopril"})
metrics.set_gauge("active_conversations", 23)

Security Considerations

Data Protection

# rxflow/utils/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
from typing import Union

class DataEncryption:
    """Handles encryption/decryption of sensitive data"""

    def __init__(self, password: str = None):
        if password is None:
            password = os.getenv("ENCRYPTION_KEY", "default-dev-key")

        self.key = self._derive_key(password)
        self.cipher = Fernet(self.key)

    def _derive_key(self, password: str) -> bytes:
        """Derive encryption key from password"""
        password_bytes = password.encode()
        salt = b"rxflow_salt_2025"  # In production, use random salt per encryption

        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )

        key = base64.urlsafe_b64encode(kdf.derive(password_bytes))
        return key

    def encrypt(self, data: Union[str, bytes]) -> str:
        """Encrypt sensitive data"""
        if isinstance(data, str):
            data = data.encode()

        encrypted_data = self.cipher.encrypt(data)
        return base64.urlsafe_b64encode(encrypted_data).decode()

    def decrypt(self, encrypted_data: str) -> str:
        """Decrypt sensitive data"""
        encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
        decrypted_data = self.cipher.decrypt(encrypted_bytes)
        return decrypted_data.decode()

# Usage for sensitive patient data
encryption = DataEncryption()

# Encrypt patient ID before storing in logs
encrypted_patient_id = encryption.encrypt("patient_12345")
logger.info("Patient lookup", patient_id_encrypted=encrypted_patient_id)

# Encrypt medication names in transit
encrypted_medication = encryption.encrypt("lisinopril")

Input Validation

# rxflow/utils/validation.py
import re
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass

@dataclass
class ValidationRule:
    """Defines a validation rule"""
    field: str
    required: bool = True
    min_length: Optional[int] = None
    max_length: Optional[int] = None
    pattern: Optional[str] = None
    allowed_values: Optional[List[str]] = None
    custom_validator: Optional[callable] = None

class InputValidator:
    """Validates user inputs for security and data integrity"""

    def __init__(self):
        self.patient_id_pattern = r"^[A-Za-z0-9]{1,20}$"
        self.medication_name_pattern = r"^[A-Za-z0-9\s\-\.]{1,100}$"
        self.zip_code_pattern = r"^\d{5}(-\d{4})?$"

    def validate_patient_lookup(self, query: str) -> Dict[str, Any]:
        """Validate patient lookup query"""

        if not query or not isinstance(query, str):
            return {
                "valid": False,
                "errors": ["Query must be a non-empty string"]
            }

        # Check for potential injection attempts
        dangerous_patterns = [
            r"[<>\"'`]",  # HTML/SQL injection characters
            r"javascript:",  # XSS attempts
            r"data:",  # Data URI schemes
            r"\.\./",  # Path traversal
        ]

        for pattern in dangerous_patterns:
            if re.search(pattern, query, re.IGNORECASE):
                return {
                    "valid": False,
                    "errors": ["Query contains potentially dangerous characters"]
                }

        # Validate format (patient_id:medication or just patient_id)
        parts = query.split(":")
        if len(parts) > 2:
            return {
                "valid": False,
                "errors": ["Invalid query format. Use 'patient_id' or 'patient_id:medication'"]
            }

        # Validate patient ID
        patient_id = parts[0].strip()
        if not re.match(self.patient_id_pattern, patient_id):
            return {
                "valid": False,
                "errors": ["Invalid patient ID format"]
            }

        # Validate medication name if provided
        if len(parts) == 2:
            medication = parts[1].strip()
            if medication and not re.match(self.medication_name_pattern, medication):
                return {
                    "valid": False,
                    "errors": ["Invalid medication name format"]
                }

        return {"valid": True, "errors": []}

    def validate_pharmacy_lookup(self, zip_code: str) -> Dict[str, Any]:
        """Validate pharmacy lookup by ZIP code"""

        if not zip_code or not isinstance(zip_code, str):
            return {
                "valid": False,
                "errors": ["ZIP code must be provided"]
            }

        zip_code = zip_code.strip()

        if not re.match(self.zip_code_pattern, zip_code):
            return {
                "valid": False,
                "errors": ["Invalid ZIP code format. Use XXXXX or XXXXX-XXXX"]
            }

        return {"valid": True, "errors": []}

    def sanitize_output(self, data: Any) -> Any:
        """Sanitize output data to prevent information leakage"""

        if isinstance(data, dict):
            sanitized = {}
            for key, value in data.items():
                # Remove sensitive fields from output
                if key.lower() in ["password", "ssn", "api_key", "token"]:
                    sanitized[key] = "[REDACTED]"
                else:
                    sanitized[key] = self.sanitize_output(value)
            return sanitized

        elif isinstance(data, list):
            return [self.sanitize_output(item) for item in data]

        elif isinstance(data, str):
            # Remove potential script tags or dangerous content
            sanitized = re.sub(r"<script.*?</script>", "[SCRIPT_REMOVED]", data, flags=re.IGNORECASE | re.DOTALL)
            return sanitized

        else:
            return data

# Usage in tools
validator = InputValidator()

def safe_patient_lookup(query: str) -> Dict[str, Any]:
    """Safe patient lookup with input validation"""

    # Validate input
    validation = validator.validate_patient_lookup(query)
    if not validation["valid"]:
        return {
            "success": False,
            "error": f"Validation failed: {', '.join(validation['errors'])}"
        }

    try:
        # Perform lookup
        tool = PatientHistoryTool()
        result = tool.get_medication_history(query)

        # Sanitize output
        sanitized_result = validator.sanitize_output(result)

        return sanitized_result

    except Exception as e:
        logger.error("Patient lookup failed", 
                    query_hash=hash(query), 
                    error=str(e))

        return {
            "success": False,
            "error": "Lookup failed due to system error"
        }

This comprehensive developer guide provides the foundation for extending and maintaining the RxFlow Pharmacy Assistant system. It covers architecture, testing, deployment, monitoring, and security considerations essential for healthcare applications.