Commit a6061655 by krds-arun

Add MAF2.0 codebase

Made-with: Cursor
parent 2d8a9cf9
node_modules
dist
.git
.env
*.log
coverage
.turbo
.cursor
# MAFGateway 2026 - Environment template
# Copy to .env and adjust for your environment.
NODE_ENV=development
APP_NAME=MAFGateway-2026
PORT=3000
HOST=0.0.0.0
# Kafka (use localhost:9092 when running Kafka via docker-compose)
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=mafgateway-2026
KAFKA_CONSUMER_GROUP=mafgateway-workers
KAFKA_STREAM_GROUP=mafgateway-stream-processors
KAFKA_DLQ_GROUP=mafgateway-dlq-processors
KAFKA_TOPIC_JOBS=mafgateway.jobs
KAFKA_TOPIC_DLQ=mafgateway.jobs.dlq
# Observability
LOG_LEVEL=debug
ENABLE_TRACING=false
METRICS_PORT=9090
# MAFGateway 2026 – Deploy API and Worker to Azure (separate App Services / Container Apps)
# Worker runs heavy tasks in its own service; API only enqueues jobs.
#
# Secrets (GitHub repo): AZURE_CREDENTIALS, ACR_LOGIN_SERVER, ACR_USERNAME, ACR_PASSWORD
# Or use OIDC: AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_SUBSCRIPTION_ID
# Variables: AZURE_API_APP_NAME, AZURE_WORKER_APP_NAME, AZURE_RESOURCE_GROUP
name: Deploy to Azure
on:
push:
branches: [main]
workflow_dispatch:
env:
ACR_NAME: '' # e.g. mafgatewayacr
AZURE_API_APP_NAME: '' # e.g. mafgateway-api
AZURE_WORKER_APP_NAME: '' # e.g. mafgateway-worker (separate service for heavy tasks)
AZURE_RESOURCE_GROUP: '' # e.g. rg-mafgateway-prod
jobs:
build-and-push:
runs-on: ubuntu-latest
outputs:
image-api: ${{ steps.meta-api.outputs.tags }}
image-worker: ${{ steps.meta-worker.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to ACR
uses: docker/login-action@v3
with:
registry: ${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}
username: ${{ secrets.ACR_USERNAME }}
password: ${{ secrets.ACR_PASSWORD }}
- name: Build and push API image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.api
push: true
tags: |
${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-api:${{ github.sha }}
${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-api:latest
- name: Build and push Worker image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.worker
push: true
tags: |
${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-worker:${{ github.sha }}
${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-worker:latest
deploy-api:
needs: build-and-push
runs-on: ubuntu-latest
environment: production
steps:
- name: Azure Login
uses: azure/login@v2
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Deploy API to Azure App Service (or Container App)
uses: azure/webapps-deploy@v3
with:
app-name: ${{ env.AZURE_API_APP_NAME }}
resource-group: ${{ env.AZURE_RESOURCE_GROUP }}
images: ${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-api:${{ github.sha }}
deploy-worker:
needs: build-and-push
runs-on: ubuntu-latest
environment: production
steps:
- name: Azure Login
uses: azure/login@v2
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Deploy Worker to Azure App Service (separate – heavy tasks)
uses: azure/webapps-deploy@v3
with:
app-name: ${{ env.AZURE_WORKER_APP_NAME }}
resource-group: ${{ env.AZURE_RESOURCE_GROUP }}
images: ${{ env.ACR_LOGIN_SERVER || format('{0}.azurecr.io', env.ACR_NAME) }}/mafgateway-worker:${{ github.sha }}
# MAFGateway 2026 - CI pipeline (build, typecheck, test)
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: pnpm/action-setup@v4
with:
version: 9
- uses: actions/setup-node@v4
with:
node-version: "20"
cache: "pnpm"
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Typecheck
run: pnpm run typecheck
- name: Build
run: pnpm run build
- name: Lint
run: pnpm run lint
continue-on-error: true
node_modules
dist
.env
*.log
coverage
.turbo
.DS_Store
.kafka
shamefully-hoist=true
strict-peer-dependencies=false
auto-install-peers=true
# MAFGateway 2026 - API
FROM node:20-alpine AS base
RUN corepack enable && corepack prepare pnpm@9.14.2 --activate
WORKDIR /app
FROM base AS deps
COPY package.json pnpm-workspace.yaml .npmrc ./
COPY packages/shared/package.json packages/shared/
COPY apps/api/package.json apps/api/
RUN pnpm install --no-frozen-lockfile
FROM base AS builder
COPY --from=deps /app/node_modules ./node_modules
COPY --from=deps /app/package.json /app/pnpm-workspace.yaml /app/.npmrc ./
COPY tsconfig.base.json ./
COPY packages ./packages
COPY apps/api ./apps/api
RUN pnpm install
RUN pnpm --filter @mafgateway/shared build && pnpm --filter api build
FROM base AS runner
ENV NODE_ENV=production
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/package.json /app/pnpm-workspace.yaml ./
COPY --from=builder /app/packages ./packages
COPY --from=builder /app/apps/api ./apps/api
EXPOSE 3000
WORKDIR /app/apps/api
CMD ["node", "dist/index.js"]
# MAFGateway 2026 - Worker
FROM node:20-alpine AS base
RUN corepack enable && corepack prepare pnpm@9.14.2 --activate
WORKDIR /app
FROM base AS deps
COPY package.json pnpm-workspace.yaml .npmrc ./
COPY packages/shared/package.json packages/shared/
COPY apps/worker/package.json apps/worker/
RUN pnpm install --no-frozen-lockfile
FROM base AS builder
COPY --from=deps /app/node_modules ./node_modules
COPY --from=deps /app/package.json /app/pnpm-workspace.yaml /app/.npmrc ./
COPY tsconfig.base.json ./
COPY packages ./packages
COPY apps/worker ./apps/worker
RUN pnpm install
RUN pnpm --filter @mafgateway/shared build && pnpm --filter worker build
FROM base AS runner
ENV NODE_ENV=production
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/package.json /app/pnpm-workspace.yaml ./
COPY --from=builder /app/packages ./packages
COPY --from=builder /app/apps/worker ./apps/worker
WORKDIR /app/apps/worker
CMD ["node", "dist/index.js"]
# MAFGateway 2026
Fastify-based **modular monolith** in a simple monorepo. Easy to maintain now; straightforward to split into microservices later.
---
## Structure
```
MAF2.0/
├── apps/
│ ├── api/ # HTTP API
│ │ └── src/
│ │ ├── index.ts # Bootstrap, routes, load modules
│ │ ├── routes/ # Global: hello, health, jobs, metrics
│ │ └── modules/ # Domain modules (hello, auth, permits, …)
│ │ ├── hello/
│ │ ├── auth/
│ │ ├── permits/
│ │ └── ...
│ └── worker/ # Kafka consumer, background jobs
│ └── src/
│ └── handlers/
├── packages/
│ └── shared/ # Types, config, logger, Kafka client
├── scripts/ # kafka-start.sh, kafka-stop.sh
├── docs/
├── package.json
└── pnpm-workspace.yaml
```
- **One shared package**`packages/shared` has types, config, logger, and Kafka client.
- **Domain modules** – Each module is a folder under `apps/api/src/modules/` with an `index.ts` that exports `{ name, register }`. Add to `registry.ts` to load it.
- **Extract later** – Copy a module folder + shared into a new app and expose the same API over HTTP.
---
## Quick start
```bash
pnpm install
pnpm run build
pnpm run dev:api # API (needs Kafka for job enqueue)
pnpm run dev:worker # Worker (consumes from Kafka)
```
**Kafka (Docker):**
```bash
docker compose up -d zookeeper kafka
# .env: KAFKA_BROKERS=localhost:9092
```
**Kafka (host):** Use `./scripts/kafka-start.sh` and `./scripts/kafka-stop.sh` if Kafka is installed under `.kafka/`. See **docs/INSTALL-KAFKA.md** for full install options.
---
## Endpoints
| Endpoint | Description |
|----------|-------------|
| `GET /` | Links to hello module |
| `GET /hello` | Redirects to `/api/v1/hello` |
| `GET /api/v1/hello` | Hello module – Hello + directory structure (reference example) |
| `POST /api/v1/hello/job` | Enqueue `hello.demo` job (API → Kafka → Worker) |
| `GET /health` | Liveness |
| `GET /metrics` | Metrics (placeholder) |
| `POST /api/v1/jobs/enqueue` | Enqueue job (body: `{ "jobType": "report.generate", "payload": {} }`) |
| `GET /api/v1/auth/me`, `POST /api/v1/auth/login` | Auth module (demo) |
---
## Scripts
| Command | Description |
|---------|-------------|
| `pnpm run build` | Build all packages and apps |
| `pnpm run dev:api` | Run API in dev |
| `pnpm run dev:worker` | Run Worker in dev |
| `pnpm run dev:all` | Run API + Worker |
| `pnpm run typecheck` | Typecheck all |
| `./scripts/kafka-start.sh` | Start Zookeeper + Kafka (host install) |
| `./scripts/kafka-stop.sh` | Stop Kafka + Zookeeper |
---
## Hello module (reference example)
The **hello** module (`apps/api/src/modules/hello/`) is the template for using the folder structure with routes and jobs.
| Route | Method | Description |
|-------|--------|-------------|
| `GET /api/v1/hello` | GET | Returns "Hello World" + directory structure map |
| `POST /api/v1/hello/job` | POST | Enqueues a `hello.demo` job; the **worker** processes it |
**Flow: API → Kafka → Worker**
1. API – `POST /api/v1/hello/job` calls `app.kafka.publishJob(job)`.
2. Kafka – Job is produced to topic `mafgateway.jobs`.
3. Worker – Dispatches by `jobType` to `apps/worker/src/handlers/hello.ts`.
**Use as template for a new module**
1. Copy `apps/api/src/modules/hello/` to e.g. `permits/`; export `default` from the module (same as `permitsModule`).
2. In `apps/api/src/modules/manifest.ts`: add `'permits'` to the `MODULE_NAMES` array.
3. In `packages/shared/src/config.ts`: add `permits: { name: 'permits', enabled: true, basePath: '/api/v1/permits' }` to `config.modules`.
4. If the module enqueues jobs: add the payload type in `packages/shared/src/types.ts`, register a builder in `apps/api/src/routes/job-registry.ts`, add a handler in `apps/worker/src/handlers/` and register it in `handlers/index.ts`.
No new package or workspace entry—new folder, one name in the manifest, and one entry per job type in the registries.
---
## Other domain modules
- **Permits** – Add `apps/api/src/modules/permits/index.ts` exporting `permitsModule: ModuleDefinition` (same pattern as auth), then add to `registry.ts`.
- **Incidents** – Same pattern; add `incidentsModule` and register.
- **Analytics** – Same pattern; for heavy work, enqueue jobs via `app.kafka.publishJob()` and handle in the worker.
- **Notifications** – Same pattern; for async sending, enqueue jobs to the worker.
---
## Queue architecture
```
Clients / APIs → API Gateway → Producer Service → Message Broker (Kafka)
┌─────────────────┼─────────────────┬─────────────────┐
▼ ▼ ▼ ▼
Worker Pool Stream Processors (retry/DLQ) Dead Letter Queue
│ │ │
▼ ▼ ▼
ResultStore ResultStore DLQ Processor
(DB/Cache) (DB/Cache) → ResultStore
```
- **API Gateway** – Fastify + gateway (request id, logging).
- **Producer Service** – Single path for enqueue; routes/modules use `app.producerService.enqueue(job)`.
- **Worker Pool** – Main consumer; heavy tasks; retry then DLQ.
- **Stream Processors** – Same topic, different group; stream-type jobs (e.g. analytics).
- **DLQ Processor** – Consumes DLQ topic; logs and stores for replay/alert.
- **ResultStore** – Sink for job results (in-memory by default; plug in DB/file).
See **docs/QUEUE-ARCHITECTURE.md** for details.
## Docs
- **docs/ARCHITECTURE.md** – Design, service boundaries, extracting to microservices.
- **docs/QUEUE-ARCHITECTURE.md** – Queue system (Gateway → Producer → Broker → Worker Pool / Stream / DLQ).
- **docs/HELLO-WORLD.md** – Hello module flow, request path, directory map.
- **docs/INSTALL-KAFKA.md** – Install Kafka (Docker or host).
- **docs/DEPLOYMENT-PLAN.md** – Azure deployment (API + Worker as separate App Services, Event Hubs).
{
"name": "api",
"version": "1.0.0",
"private": true,
"type": "module",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"dev": "tsx watch src/index.ts",
"start": "node dist/index.js",
"clean": "rm -rf dist",
"typecheck": "tsc --noEmit",
"lint": "eslint src --ext .ts",
"test": "echo 'tests'"
},
"dependencies": {
"@mafgateway/shared": "workspace:*",
"@fastify/cors": "^10.0.1",
"@fastify/helmet": "^12.0.1",
"@fastify/sensible": "^6.0.1",
"fastify": "^5.1.0"
},
"devDependencies": {
"tsx": "^4.19.2",
"typescript": "^5.7.2"
}
}
/**
* API Gateway / Load Balancer layer – single entry for all client traffic.
* Handles: request id, logging, and forwards to routes → Producer Service (for jobs).
*/
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import { childLogger } from '@mafgateway/shared';
export async function registerGateway(app: FastifyInstance): Promise<void> {
const log = childLogger({ module: 'gateway' });
app.addHook('onRequest', async (request: FastifyRequest, _reply: FastifyReply) => {
const req = request as FastifyRequest & { requestId?: string };
req.requestId = (request.headers['x-request-id'] as string) ?? crypto.randomUUID();
log.debug({ requestId: req.requestId, method: request.method, url: request.url }, 'Gateway: request');
});
}
/**
* MAFGateway 2026 - API Bootstrap
* Modular monolith entry; each module is loaded and can be extracted to a microservice later.
*/
import Fastify from 'fastify';
import cors from '@fastify/cors';
import helmet from '@fastify/helmet';
import sensible from '@fastify/sensible';
import { loadConfig, createLogger, childLogger, createKafkaJobClient, createProducerService } from '@mafgateway/shared';
import { registerGateway } from './gateway.js';
import { registerModules } from './modules/registry.js';
import { registerRoutes } from './routes/index.js';
async function buildApp() {
const config = loadConfig();
const logger = createLogger({
level: config.observability.logLevel,
...(config.env !== 'production' && {
transport: {
target: 'pino-pretty',
options: { colorize: true },
},
}),
});
const app = Fastify({
logger: false,
requestIdHeader: 'x-request-id',
requestIdLogLabel: 'requestId',
trustProxy: true,
});
await app.register(cors, { origin: true });
await app.register(helmet, { contentSecurityPolicy: false });
await app.register(sensible);
// API Gateway layer (request id, logging)
await registerGateway(app);
// Message Broker: Kafka client + Producer Service Layer (only path for enqueue)
const kafkaClient = createKafkaJobClient(config.kafka);
await kafkaClient.connect();
app.decorate('kafka', kafkaClient);
app.decorate('producerService', createProducerService(kafkaClient));
app.addHook('onClose', async () => await kafkaClient.disconnect());
await registerRoutes(app, config);
await registerModules(app, config);
app.setErrorHandler((err: unknown, request, reply) => {
const req = request as { requestId?: string };
const log = childLogger({
module: 'api',
requestId: req.requestId,
});
const error = err instanceof Error ? err : new Error(String(err));
log.error({ err: error }, error.message);
const statusCode = err && typeof err === 'object' && 'statusCode' in err ? Number((err as { statusCode?: number }).statusCode) : 500;
const code = err && typeof err === 'object' && 'code' in err ? String((err as { code?: string }).code) : 'INTERNAL_ERROR';
reply.status(statusCode).send({
success: false,
error: {
code,
message: config.env === 'production' ? 'Internal error' : error.message,
},
meta: { requestId: req.requestId },
});
});
return { app, config, logger };
}
async function start() {
const { app, config, logger } = await buildApp();
try {
await app.listen({ port: config.app.port, host: config.app.host });
logger.info(
{ port: config.app.port, host: config.app.host, env: config.env },
`${config.app.name} listening`
);
} catch (err) {
logger.error(err);
process.exit(1);
}
}
start();
/**
* Auth module – lives inside API. To extract: copy this folder + shared to a new service.
*/
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import type { ModuleDefinition, ModuleContext, ApiResponse } from '@mafgateway/shared';
import { childLogger } from '@mafgateway/shared';
const logger = childLogger({ module: 'auth' });
async function authRoutes(app: FastifyInstance, basePath: string) {
app.get(`${basePath}/me`, async (request: FastifyRequest, reply: FastifyReply) => {
const req = request as FastifyRequest & { requestId?: string };
const response: ApiResponse<{ userId: string; scope: string[] }> = {
success: true,
data: { userId: 'demo-user', scope: ['read:profile'] },
meta: { requestId: req.requestId, timestamp: new Date().toISOString() },
};
return reply.send(response);
});
app.post(`${basePath}/login`, async (request: FastifyRequest, reply: FastifyReply) => {
const body = request.body as { username?: string; password?: string };
if (!body?.username) {
return reply.code(400).send({
success: false,
error: { code: 'VALIDATION_ERROR', message: 'username required' },
} as ApiResponse);
}
logger.info({ username: body.username }, 'Login attempt');
return reply.send({
success: true,
data: { token: 'demo-jwt', expiresIn: 3600 },
} as ApiResponse);
});
}
async function register(app: unknown, ctx: ModuleContext): Promise<void> {
await authRoutes(app as FastifyInstance, ctx.basePath);
logger.info({ basePath: ctx.basePath }, 'Auth module registered');
}
const authModule: ModuleDefinition = {
name: 'auth',
register,
};
export { authModule };
export default authModule;
/**
* Hello World module – REFERENCE EXAMPLE for using the folder structure.
*
* This module shows:
* 1. How to define a module (name + register)
* 2. How to register routes under the module basePath
* 3. How to enqueue a background job via app.kafka (API → Kafka → Worker)
*
* Copy this folder when creating a new domain module (e.g. permits, incidents).
*/
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import type { ModuleDefinition, ModuleContext, ApiResponse, HelloDemoPayload } from '@mafgateway/shared';
import { childLogger } from '@mafgateway/shared';
const logger = childLogger({ module: 'hello' });
/** Route: GET /api/v1/hello – Hello message + directory structure reference */
async function getHello(request: FastifyRequest, reply: FastifyReply) {
const req = request as FastifyRequest & { requestId?: string };
return reply.send({
success: true,
data: {
message: 'Hello World',
from: 'hello module (reference example)',
requestId: req.requestId,
directoryStructure: {
thisModule: 'apps/api/src/modules/hello/',
flow: [
'1. Request → apps/api/src/index.ts (bootstrap)',
'2. Module loaded by apps/api/src/modules/registry.ts',
'3. Routes registered here (apps/api/src/modules/hello/index.ts)',
'4. Jobs enqueued via Producer Service → Message Broker → Worker Pool',
],
paths: {
apiBootstrap: 'apps/api/src/index.ts',
moduleRegistry: 'apps/api/src/modules/registry.ts',
thisFile: 'apps/api/src/modules/hello/index.ts',
sharedPackage: 'packages/shared/ (types, config, logger, kafka)',
workerHandlers: 'apps/worker/src/handlers/',
},
},
},
meta: { requestId: req.requestId },
} as ApiResponse);
}
/** Route: POST /api/v1/hello/job – Enqueue a demo job (API → Kafka → Worker) */
async function postHelloJob(request: FastifyRequest, reply: FastifyReply, app: FastifyInstance) {
const req = request as FastifyRequest & { requestId?: string };
const body = (request.body as { message?: string }) ?? {};
const jobId = crypto.randomUUID();
const job: HelloDemoPayload = {
jobId,
jobType: 'hello.demo',
correlationId: req.requestId,
createdAt: new Date().toISOString(),
message: body.message ?? 'Hello from the hello module',
requestId: req.requestId,
};
try {
await app.producerService.enqueue(job);
logger.info({ jobId, message: job.message }, 'Hello demo job enqueued');
return reply.send({
success: true,
data: {
jobId,
message: 'Job enqueued. Run the worker to process it: pnpm run dev:worker',
jobType: 'hello.demo',
},
meta: { requestId: req.requestId },
} as ApiResponse);
} catch (err) {
logger.error({ err }, 'Failed to enqueue hello job');
return reply.code(503).send({
success: false,
error: { code: 'KAFKA_UNAVAILABLE', message: 'Could not enqueue job. Is Kafka running?' },
meta: { requestId: req.requestId },
} as ApiResponse);
}
}
async function register(app: unknown, ctx: ModuleContext): Promise<void> {
const fastify = app as FastifyInstance;
const { basePath } = ctx;
fastify.get(basePath, getHello);
fastify.post(`${basePath}/job`, (req, reply) => postHelloJob(req, reply, fastify));
logger.info({ basePath }, 'Hello module registered (reference example)');
}
const helloModule: ModuleDefinition = {
name: 'hello',
register,
};
export { helloModule };
export default helloModule;
/**
* Module manifest – list module names to load. Add a new name here when you add a module folder.
* Registry will dynamic-import ./<name>/index.js and call register().
*/
export const MODULE_NAMES = ['hello', 'auth'] as const;
export type ModuleName = (typeof MODULE_NAMES)[number];
/**
* Loads domain modules by convention: each name in MODULE_NAMES is dynamic-imported
* from ./<name>/index.js (must export default ModuleDefinition).
* To add a module: create modules/<name>/index.ts and add <name> to manifest.ts.
*/
import type { FastifyInstance } from 'fastify';
import type { AppConfig, ModuleDefinition } from '@mafgateway/shared';
import { MODULE_NAMES } from './manifest.js';
export async function registerModules(app: FastifyInstance, config: AppConfig): Promise<void> {
for (const name of MODULE_NAMES) {
const moduleConfig = config.modules[name];
if (!moduleConfig?.enabled) continue;
const mod = await import(`./${name}/index.js`);
const definition = mod.default as ModuleDefinition;
if (!definition?.register) {
app.log?.warn?.({ module: name }, 'Module has no default export with register, skipping');
continue;
}
await definition.register(app, {
basePath: moduleConfig.basePath ?? `/api/v1/${name}`,
config: moduleConfig,
});
app.log?.info?.({ module: definition.name, basePath: moduleConfig.basePath }, 'Module registered');
}
}
import type { FastifyRequest, FastifyReply } from 'fastify';
import type { AppConfig, HealthCheckResult } from '@mafgateway/shared';
export function healthRoutes(config: AppConfig) {
return async function handler(
_request: FastifyRequest<{ Querystring: { deep?: string } }>,
reply: FastifyReply
) {
const deep = _request.query?.deep === 'true';
const result: { status: string; env: string; checks?: HealthCheckResult[] } = {
status: 'ok',
env: config.env,
};
if (deep) {
result.checks = [];
// Placeholder for module health checks when they register
}
return reply.send(result);
};
}
/**
* Root-level hello – points to the hello module for the full example.
* The real reference is the module: GET /api/v1/hello and POST /api/v1/hello/job
*/
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
export async function helloRoutes(app: FastifyInstance): Promise<void> {
app.get('/', async (_request: FastifyRequest, reply: FastifyReply) => {
return reply.send({
message: 'MAFGateway 2026',
helloModule: 'GET /api/v1/hello – Hello + directory structure (reference example)',
helloJob: 'POST /api/v1/hello/job – Enqueue a demo job (API → Kafka → Worker)',
seeModule: 'apps/api/src/modules/hello/',
});
});
app.get('/hello', async (_request: FastifyRequest, reply: FastifyReply) => {
return reply.redirect('/api/v1/hello', 302);
});
}
/**
* Central route registration – add new core routes here to keep index.ts minimal.
*/
import type { FastifyInstance } from 'fastify';
import type { AppConfig } from '@mafgateway/shared';
import { healthRoutes } from './health.js';
import { helloRoutes } from './hello.js';
import { registerJobRoutes } from './jobs.js';
import { metricsRoutes } from './metrics.js';
export async function registerRoutes(app: FastifyInstance, config: AppConfig): Promise<void> {
app.get('/health', healthRoutes(config));
await helloRoutes(app);
await registerJobRoutes(app);
await metricsRoutes(app);
}
/**
* Job payload builder registry – single place to add new enqueueable job types.
* Add an entry here when you add a new job type; no need to edit the route handler.
*/
import type { JobPayload, ReportGeneratePayload, AnalyticsAggregatePayload } from '@mafgateway/shared';
export type JobBuilderContext = { requestId?: string };
export type JobBuilder = (
customPayload: Record<string, unknown> | undefined,
ctx: JobBuilderContext
) => JobPayload;
const builders: Partial<Record<JobPayload['jobType'], JobBuilder>> = {};
function registerJobBuilder<T extends JobPayload['jobType']>(
jobType: T,
builder: (payload: Record<string, unknown> | undefined, ctx: JobBuilderContext) => Extract<JobPayload, { jobType: T }>
): void {
builders[jobType] = builder as JobBuilder;
}
/** Build base fields for all jobs */
function base(jobId: string, jobType: JobPayload['jobType'], ctx: JobBuilderContext, metadata?: Record<string, unknown>) {
return {
jobId,
jobType,
correlationId: ctx.requestId,
createdAt: new Date().toISOString(),
metadata,
};
}
// ---- Register known job types (add new ones here) ----
registerJobBuilder('report.generate', (customPayload, ctx) => {
const jobId = crypto.randomUUID();
return {
...base(jobId, 'report.generate', ctx, customPayload),
reportType: (customPayload?.reportType as string) ?? 'default',
params: (customPayload?.params as Record<string, unknown>) ?? {},
userId: customPayload?.userId as string | undefined,
} as ReportGeneratePayload;
});
registerJobBuilder('analytics.aggregate', (customPayload, ctx) => {
const jobId = crypto.randomUUID();
return {
...base(jobId, 'analytics.aggregate', ctx, customPayload),
period: (customPayload?.period as string) ?? 'day',
dimensions: (customPayload?.dimensions as string[]) ?? [],
} as AnalyticsAggregatePayload;
});
export function getJobBuilder(jobType: string): JobBuilder | undefined {
return builders[jobType as JobPayload['jobType']];
}
export function getRegisteredJobTypes(): string[] {
return Object.keys(builders);
}
/**
* Job enqueue API – uses job-registry for payload building. Add new job types in job-registry.ts.
*/
import type { FastifyInstance } from 'fastify';
import type { JobPayload } from '@mafgateway/shared';
import { getJobBuilder } from './job-registry.js';
interface EnqueueBody {
jobType: string;
payload?: Record<string, unknown>;
}
export async function registerJobRoutes(app: FastifyInstance): Promise<void> {
app.post<{ Body: EnqueueBody }>('/api/v1/jobs/enqueue', async (request, reply) => {
const { jobType, payload: customPayload } = request.body ?? {};
const producer = app.producerService;
if (!producer) return reply.code(503).send({ success: false, error: 'Jobs unavailable' });
const builder = getJobBuilder(jobType);
if (!builder) {
return reply.code(400).send({ success: false, error: `Unknown jobType: ${jobType}` });
}
const req = request as { requestId?: string };
const job = builder(customPayload, { requestId: req.requestId }) as JobPayload;
await producer.enqueue(job);
return reply.send({ success: true, data: { jobId: job.jobId } });
});
}
/**
* Placeholder metrics endpoint - integrate with prom-client or OpenTelemetry in production.
*/
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
export async function metricsRoutes(app: FastifyInstance): Promise<void> {
app.get('/metrics', async (_request: FastifyRequest, reply: FastifyReply) => {
reply.header('Content-Type', 'text/plain; charset=utf-8');
return reply.send(
'# MAFGateway 2026 metrics (placeholder)\n# Add prom-client or OpenTelemetry exporter here\nmafgateway_info{version="1.0.0"} 1\n'
);
});
}
import type { KafkaJobClient, ProducerService } from '@mafgateway/shared';
declare module 'fastify' {
interface FastifyInstance {
kafka: KafkaJobClient;
producerService: ProducerService;
}
}
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src/**/*"]
}
{
"name": "worker",
"version": "1.0.0",
"private": true,
"type": "module",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"dev": "tsx watch src/index.ts",
"start": "node dist/index.js",
"clean": "rm -rf dist",
"typecheck": "tsc --noEmit",
"lint": "eslint src --ext .ts",
"test": "echo 'tests'"
},
"dependencies": {
"@mafgateway/shared": "workspace:*"
},
"devDependencies": {
"tsx": "^4.19.2",
"typescript": "^5.7.2"
}
}
/**
* Dead Letter Queue Processor – consumes from DLQ topic.
* Logs and persists failed messages for inspection / replay / alerting.
*/
import type { JobPayload, JobResult, ResultStore } from '@mafgateway/shared';
import type { Logger } from '@mafgateway/shared';
interface DLQMessage {
jobId: string;
jobType: string;
dlqReason?: string;
dlqAt?: string;
[k: string]: unknown;
}
export async function handleDLQMessage(
msg: { key: string | null; value: string },
logger: Logger,
resultStore: ResultStore
): Promise<void> {
let payload: DLQMessage;
try {
payload = JSON.parse(msg.value) as DLQMessage;
} catch {
logger.error({ value: msg.value.slice(0, 200) }, 'DLQ Processor: invalid JSON');
return;
}
logger.warn(
{ jobId: payload.jobId, jobType: payload.jobType, dlqReason: payload.dlqReason, dlqAt: payload.dlqAt },
'DLQ Processor: received failed job'
);
const result: JobResult = {
jobId: payload.jobId,
jobType: payload.jobType ?? 'unknown',
status: 'failed',
error: payload.dlqReason ?? 'Unknown',
completedAt: payload.dlqAt ?? new Date().toISOString(),
};
await resultStore.save(payload.jobId, payload as JobPayload, result);
}
import type { AnalyticsAggregatePayload } from '@mafgateway/shared';
import type { Logger } from '@mafgateway/shared';
export async function analyticsAggregateHandler(
payload: AnalyticsAggregatePayload,
logger: Logger
): Promise<void> {
logger.info(
{ period: payload.period, dimensions: payload.dimensions },
'Aggregating analytics (simulated)'
);
await new Promise((r) => setTimeout(r, 300));
logger.info({ jobId: payload.jobId }, 'Analytics aggregation complete');
}
/**
* Handler for hello.demo jobs – enqueued by the hello module (POST /api/v1/hello/job).
*/
import type { HelloDemoPayload } from '@mafgateway/shared';
import type { Logger } from '@mafgateway/shared';
export async function helloDemoHandler(payload: HelloDemoPayload, logger: Logger): Promise<void> {
logger.info(
{ jobId: payload.jobId, message: payload.message, requestId: payload.requestId },
'Hello demo job processed (worker)'
);
// Simulate a small delay (e.g. sending email, calling external API)
await new Promise((r) => setTimeout(r, 100));
logger.info({ jobId: payload.jobId }, 'Hello demo job complete');
}
/**
* Job handler registry – map jobType → handler. Add new entries here when adding job types.
*/
import type {
JobPayload,
HelloDemoPayload,
ReportGeneratePayload,
AnalyticsAggregatePayload,
Logger,
} from '@mafgateway/shared';
import { helloDemoHandler } from './hello.js';
import { reportGenerateHandler } from './report.js';
import { analyticsAggregateHandler } from './analytics.js';
type JobHandler = (payload: JobPayload, logger: Logger) => Promise<void>;
const handlers: Partial<Record<JobPayload['jobType'], JobHandler>> = {
'hello.demo': (p, log) => helloDemoHandler(p as HelloDemoPayload, log),
'report.generate': (p, log) => reportGenerateHandler(p as ReportGeneratePayload, log),
'analytics.aggregate': (p, log) => analyticsAggregateHandler(p as AnalyticsAggregatePayload, log),
};
export async function handleJob(payload: JobPayload, logger: Logger): Promise<void> {
const handler = handlers[payload.jobType];
if (!handler) {
logger.warn({ jobType: payload.jobType }, 'Unknown job type, skipping');
return;
}
await handler(payload, logger);
}
import type { ReportGeneratePayload } from '@mafgateway/shared';
import type { Logger } from '@mafgateway/shared';
export async function reportGenerateHandler(
payload: ReportGeneratePayload,
logger: Logger
): Promise<void> {
logger.info(
{ reportType: payload.reportType, params: payload.params },
'Generating report (simulated)'
);
await new Promise((r) => setTimeout(r, 500));
logger.info({ jobId: payload.jobId }, 'Report generation complete');
}
/**
* MAFGateway 2026 – Worker process.
*
* Queue architecture:
* Message Broker (Kafka)
* │
* ┌────┴────┬─────────────────┬─────────────────┐
* ▼ ▼ ▼ ▼
* Worker Pool Stream Processors (DLQ topic) Dead Letter Queue
* │ │ │
* ▼ ▼ ▼
* ResultStore ResultStore DLQ Processor
* (DB/Cache) (DB/Cache) → ResultStore / alert
*/
import {
loadConfig,
createLogger,
createKafkaJobClient,
createQueueConsumer,
createDefaultResultStore,
type JobPayload,
type JobResult,
} from '@mafgateway/shared';
import { handleJob } from './handlers/index.js';
import { handleStreamMessage } from './stream-processor.js';
import { handleDLQMessage } from './dlq-processor.js';
async function main() {
const config = loadConfig();
const logger = createLogger({ level: config.observability.logLevel });
// Sink: Database / Cache / Storage (Worker Pool + Stream + DLQ write here)
const resultStore = createDefaultResultStore();
// ---- Worker Pool: consumes main queue, heavy tasks, retry + DLQ ----
const kafkaPool = createKafkaJobClient({
...config.kafka,
clientId: config.kafka.clientId + '-pool',
});
await kafkaPool.connect();
await kafkaPool.startConsumer(config.kafka.consumerGroup, async (payload: JobPayload) => {
// Stream Processors handle these; Worker Pool skips to avoid duplicate work
if (payload.jobType === 'analytics.aggregate') return;
logger.info({ jobId: payload.jobId, jobType: payload.jobType }, 'Worker Pool: processing');
try {
await handleJob(payload, logger);
const result: JobResult = {
jobId: payload.jobId,
jobType: payload.jobType,
status: 'completed',
completedAt: new Date().toISOString(),
};
await resultStore.save(payload.jobId, payload, result);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
const result: JobResult = {
jobId: payload.jobId,
jobType: payload.jobType,
status: 'failed',
error: error.message,
completedAt: new Date().toISOString(),
};
await resultStore.save(payload.jobId, payload, result);
throw err; // let KafkaJobClient handle retry/DLQ
}
});
// ---- Stream Processors: same topic, different group, stream-type jobs only ----
const streamConsumer = createQueueConsumer(
{
brokers: config.kafka.brokers,
clientId: config.kafka.clientId + '-stream',
sasl: config.kafka.sasl,
ssl: config.kafka.ssl,
},
config.kafka.streamProcessorGroup,
config.kafka.topics.jobs,
async (msg) => {
await handleStreamMessage(msg, logger, resultStore);
}
);
await streamConsumer.start();
// ---- Dead Letter Queue Processor: consumes DLQ topic ----
const dlqConsumer = createQueueConsumer(
{
brokers: config.kafka.brokers,
clientId: config.kafka.clientId + '-dlq',
sasl: config.kafka.sasl,
ssl: config.kafka.ssl,
},
config.kafka.dlqProcessorGroup,
config.kafka.topics.dlq,
async (msg) => {
await handleDLQMessage(msg, logger, resultStore);
}
);
await dlqConsumer.start();
const shutdown = async () => {
logger.info('Shutting down worker...');
await streamConsumer.disconnect();
await dlqConsumer.disconnect();
await kafkaPool.disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
logger.info(
{
poolGroup: config.kafka.consumerGroup,
streamGroup: config.kafka.streamProcessorGroup,
dlqGroup: config.kafka.dlqProcessorGroup,
},
'Worker started (Worker Pool + Stream Processors + DLQ Processor)'
);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
/**
* Stream Processors – consume from main queue (same topic as Worker Pool, different group).
* Handle only "stream" job types (e.g. analytics.aggregate); others are skipped (Worker Pool handles them).
*/
import type { JobPayload, JobResult, ResultStore } from '@mafgateway/shared';
import type { Logger } from '@mafgateway/shared';
import { analyticsAggregateHandler } from './handlers/analytics.js';
const STREAM_JOB_TYPES: JobPayload['jobType'][] = ['analytics.aggregate'];
export async function handleStreamMessage(
msg: { key: string | null; value: string },
logger: Logger,
resultStore: ResultStore
): Promise<void> {
let job: JobPayload;
try {
job = JSON.parse(msg.value) as JobPayload;
} catch {
return;
}
if (!STREAM_JOB_TYPES.includes(job.jobType)) {
return; // Worker Pool handles this
}
logger.info({ jobId: job.jobId, jobType: job.jobType }, 'Stream Processor: processing');
try {
await analyticsAggregateHandler(job as import('@mafgateway/shared').AnalyticsAggregatePayload, logger);
const result: JobResult = {
jobId: job.jobId,
jobType: job.jobType,
status: 'completed',
completedAt: new Date().toISOString(),
};
await resultStore.save(job.jobId, job, result);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
logger.warn({ jobId: job.jobId, error: error.message }, 'Stream Processor: failed');
const result: JobResult = {
jobId: job.jobId,
jobType: job.jobType,
status: 'failed',
error: error.message,
completedAt: new Date().toISOString(),
};
await resultStore.save(job.jobId, job, result);
}
}
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src/**/*"]
}
# MAFGateway 2026 - Local dev stack (API + Worker + Kafka)
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "3000:3000"
environment:
NODE_ENV: production
PORT: 3000
HOST: 0.0.0.0
KAFKA_BROKERS: kafka:29092
KAFKA_CLIENT_ID: mafgateway-api
KAFKA_TOPIC_JOBS: mafgateway.jobs
KAFKA_TOPIC_DLQ: mafgateway.jobs.dlq
depends_on: [kafka]
worker:
build:
context: .
dockerfile: Dockerfile.worker
environment:
NODE_ENV: production
KAFKA_BROKERS: kafka:29092
KAFKA_CLIENT_ID: mafgateway-worker
KAFKA_CONSUMER_GROUP: mafgateway-workers
KAFKA_TOPIC_JOBS: mafgateway.jobs
KAFKA_TOPIC_DLQ: mafgateway.jobs.dlq
depends_on: [kafka]
# MAFGateway 2026 – Architecture (simplified)
## Folder structure
```
MAF2.0/
├── apps/
│ ├── api/ # HTTP API (modular monolith)
│ │ └── src/
│ │ ├── index.ts # Bootstrap, plugins, route registration, module loader
│ │ ├── routes/ # Global routes (hello, health, jobs, metrics)
│ │ └── modules/ # Domain modules (one folder per domain)
│ │ ├── auth/
│ │ ├── permits/
│ │ ├── incidents/
│ │ ├── analytics/
│ │ └── notifications/
│ └── worker/ # Background job processor (Kafka consumer)
│ └── src/
│ └── handlers/ # One file per job type
├── packages/
│ └── shared/ # Single shared package
│ └── src/
│ ├── types.ts # DTOs, job payloads, module contract
│ ├── config.ts # loadConfig() from env
│ ├── logger.ts # Pino logger
│ ├── kafka.ts # Producer/consumer, retry, DLQ
│ └── index.ts
├── docs/
├── package.json
└── pnpm-workspace.yaml
```
## Why this is simpler
| Before | After |
|--------|--------|
| 4 packages: types, config, logger, kafka-client | 1 package: **shared** |
| Domain modules as separate workspace packages (`modules/auth`, etc.) with their own package.json, tsconfig, build | Domain modules as **folders** under `apps/api/src/modules/`; no extra build step, no workspace entries |
| Registry imported from `@mafgateway/module-auth` | Registry imports from `./auth/index.js` (local path) |
## Service boundaries
- **API** – Serves HTTP. Loads each domain module from `src/modules/<name>/index.ts` via `registry.ts`. Each module exports `{ name, register(app, ctx) }`.
- **Worker** – Consumes from Kafka topic `mafgateway.jobs`, dispatches by `jobType` to handlers. Retries with backoff; after max retries, sends to DLQ topic.
- **shared** – Used by both API and worker. No business logic; only types, config, logger, Kafka client.
## Extracting a module to a microservice
1. Copy `apps/api/src/modules/<name>/` (e.g. `auth`) into a new app or repo.
2. Copy or depend on `packages/shared` (or only the parts you need).
3. In the new app, expose the same HTTP routes.
4. In the monolith, remove the module from `registry.ts` and call the new service via HTTP/gRPC instead of `register()`.
Same API contract; only the call moves from in-process to network.
## Kafka and worker
- **Topics:** `mafgateway.jobs` (main), `mafgateway.jobs.dlq` (dead-letter).
- **Retry:** Up to 5 attempts, exponential backoff; then DLQ.
- **Scaling:** Run more worker instances; same consumer group shares partitions.
## Observability
- **Logging:** Pino via `@mafgateway/shared`; `requestId` and module context.
- **Health:** `GET /health`.
- **Metrics:** `GET /metrics` (placeholder; add Prometheus/OpenTelemetry as needed).
# Hello World module – reference example
The **hello** module is the reference for how to use the folder structure: routes, jobs, and workers.
## Try it
```bash
# Terminal 1: API
pnpm run dev:api
# Terminal 2: Worker (optional; needed to process enqueued jobs)
pnpm run dev:worker
# Terminal 3: requests
curl http://localhost:3000/
curl http://localhost:3000/hello # redirects to /api/v1/hello
curl http://localhost:3000/api/v1/hello # Hello + directory structure
curl -X POST http://localhost:3000/api/v1/hello/job -H "Content-Type: application/json" -d '{"message":"My first job"}'
```
With the worker running, the last request enqueues a job; the worker logs that it processed the `hello.demo` job.
---
## What the hello module demonstrates
| Piece | Location | Purpose |
|-------|----------|---------|
| **Module definition** | `apps/api/src/modules/hello/index.ts` | Exports `helloModule` with `name` + `register(app, ctx)`. |
| **Routes** | Same file | `GET /api/v1/hello` (hello + structure), `POST /api/v1/hello/job` (enqueue job). |
| **Job enqueue** | Same file, `postHelloJob` | Uses `app.kafka.publishJob(job)` (API → Kafka). |
| **Shared types** | `packages/shared/src/types.ts` | `JobType` + `HelloDemoPayload` for `hello.demo` jobs. |
| **Config** | `packages/shared/src/config.ts` | `modules.hello` with `basePath: '/api/v1/hello'`. |
| **Registry** | `apps/api/src/modules/registry.ts` | Imports `helloModule` and adds it to the `modules` array. |
| **Worker handler** | `apps/worker/src/handlers/hello.ts` | Handles `hello.demo` (Kafka → Worker). |
| **Worker dispatch** | `apps/worker/src/handlers/index.ts` | `case 'hello.demo':``helloDemoHandler`. |
---
## Flow: API → Kafka → Worker
```
POST /api/v1/hello/job
apps/api/src/modules/hello/index.ts → app.kafka.publishJob(job)
Kafka topic: mafgateway.jobs
apps/worker/src/index.ts (consumer)
apps/worker/src/handlers/index.ts → case 'hello.demo'
apps/worker/src/handlers/hello.ts → helloDemoHandler()
```
---
## Using the hello module as a template
1. **New module (e.g. permits)**
Copy `apps/api/src/modules/hello/` to `apps/api/src/modules/permits/`.
Rename to `permitsModule`, routes under `/api/v1/permits`.
Add to `registry.ts` and `config.modules.permits`.
2. **New job type**
In `packages/shared/src/types.ts`: add to `JobType` and define a payload (e.g. `PermitsSyncPayload`).
In `apps/worker/src/handlers/`: add a handler and a `case` in `handlers/index.ts`.
3. **Enqueue from a module**
In the module’s route handler, use `app.kafka.publishJob(payload)`.
Ensure the payload matches a `JobType` and that the worker has a handler for it.
See also: **apps/api/src/modules/hello/README.md**.
---
## Directory layout (quick ref)
| Path | Purpose |
|------|--------|
| **apps/api/src/index.ts** | Bootstrap, plugins, routes, Kafka, module loader. |
| **apps/api/src/routes/** | Global routes (/, /health, /api/v1/jobs/enqueue, /metrics). |
| **apps/api/src/modules/hello/** | Hello module (reference): routes + job enqueue. |
| **apps/api/src/modules/registry.ts** | Loads all domain modules. |
| **packages/shared/** | Types, config, logger, Kafka client. |
| **apps/worker/src/handlers/** | Job handlers by `jobType`. |
# Install Kafka for local development
Use one of the options below so the API and Worker can use Kafka locally.
---
## Option 1: Docker (recommended)
If you have **Docker** and **Docker Compose** installed:
```bash
# From the project root (MAF2.0)
docker compose up -d zookeeper kafka
```
Kafka will listen on **localhost:9092**. Use in `.env`:
```bash
KAFKA_BROKERS=localhost:9092
```
To install Docker on Ubuntu:
```bash
sudo apt update
sudo apt install docker.io docker-compose-v2
# Add your user to the docker group to run without sudo:
sudo usermod -aG docker $USER
# Log out and back in, then:
docker compose up -d zookeeper kafka
```
---
## Option 2: Docker Compose (legacy command)
If you have the older `docker-compose` (with hyphen):
```bash
docker-compose up -d zookeeper kafka
```
---
## Option 3: Install Kafka on the host (no Docker)
Apache Kafka is installed under the project as **`.kafka/`** (ignored by git). If you need to reinstall or install on another machine:
```bash
# Java required (OpenJDK 17+)
java -version
# From project root (MAF2.0)
mkdir -p .kafka && cd .kafka
wget https://archive.apache.org/dist/kafka/3.6.2/kafka_2.13-3.6.2.tgz -O kafka.tgz
tar -xzf kafka.tgz && mv kafka_2.13-3.6.2 kafka && rm kafka.tgz
cd kafka
# Terminal 1: Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Terminal 2: Kafka
bin/kafka-server-start.sh config/server.properties
```
**Start/stop scripts (after install):**
```bash
./scripts/kafka-start.sh # Start Zookeeper + Kafka in background
./scripts/kafka-stop.sh # Stop both
```
Topics `mafgateway.jobs` and `mafgateway.jobs.dlq` are created automatically when the app first uses them, or create them once:
```bash
cd .kafka/kafka
bin/kafka-topics.sh --create --topic mafgateway.jobs --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic mafgateway.jobs.dlq --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
```
Then set in `.env`:
```bash
KAFKA_BROKERS=localhost:9092
```
---
## Verify
1. Copy `.env.example` to `.env` and set `KAFKA_BROKERS=localhost:9092`.
2. Start the API: `pnpm run dev:api`.
3. Start the Worker: `pnpm run dev:worker`.
4. Enqueue a job: `curl -X POST http://localhost:3000/api/v1/hello/job -H "Content-Type: application/json" -d '{}'`.
5. Check the Worker terminal for a log line like “Hello demo job processed”.
---
## Stop Kafka (Docker)
```bash
docker compose down
# Or: docker-compose down
```
# Queue system architecture
```
Clients / APIs
API Gateway / Load Balancer ← Fastify API (gateway.ts: request id, logging)
Producer Service Layer ← producerService.enqueue(job); single path to broker
Message Broker (Kafka) ← topics: mafgateway.jobs, mafgateway.jobs.dlq
├──────────────────┬──────────────────┬──────────────────┐
▼ ▼ ▼ ▼
Worker Pool Stream Processors (failed after Dead Letter Queue
(heavy tasks) (e.g. analytics) retries) (DLQ topic)
│ │ │
▼ ▼ ▼
ResultStore ResultStore DLQ Processor
(DB/Cache/Storage) (DB/Cache/Storage) → ResultStore / alert
```
## Components
| Layer | Implementation |
|-------|----------------|
| **API Gateway** | `apps/api`: Fastify + gateway.ts (onRequest: requestId, logging). Single entry for clients. |
| **Producer Service** | `packages/shared/producer-service.ts`. All enqueue goes through `producerService.enqueue(job)`. API and modules use this only. |
| **Message Broker** | Kafka. Channels: `mafgateway.jobs` (main queue), `mafgateway.jobs.dlq` (dead letter). |
| **Worker Pool** | `apps/worker`: main consumer (group `mafgateway-workers`). Handles hello.demo, report.generate, file.process, etc. Retry + DLQ on failure. Writes to ResultStore. |
| **Stream Processors** | Same topic, group `mafgateway-stream-processors`. Handles stream-type jobs (e.g. analytics.aggregate). Writes to ResultStore. |
| **Dead Letter Queue** | Failed jobs (after max retries) are produced to `mafgateway.jobs.dlq`. |
| **DLQ Processor** | Consumer on DLQ topic (group `mafgateway-dlq-processors`). Logs and persists to ResultStore for inspection/replay. |
| **ResultStore** | Sink for completed/failed results. Default: InMemoryResultStore; can use FileResultStore or DB. |
## Env (worker)
- `KAFKA_CONSUMER_GROUP` – Worker Pool group (default `mafgateway-workers`).
- `KAFKA_STREAM_GROUP` – Stream Processors group (default `mafgateway-stream-processors`).
- `KAFKA_DLQ_GROUP` – DLQ Processor group (default `mafgateway-dlq-processors`).
## Flow
1. Client → **API Gateway** → route or module calls **Producer Service** `enqueue(job)`.
2. **Producer Service** → produces to Kafka topic `mafgateway.jobs`.
3. **Worker Pool** and **Stream Processors** consume from `mafgateway.jobs` (different groups).
- Pool: handles most job types; retries then sends to DLQ on final failure.
- Stream: handles only stream types (e.g. analytics.aggregate).
4. **DLQ Processor** consumes from `mafgateway.jobs.dlq`, logs and stores for alerting/replay.
5. Workers and DLQ processor write results to **ResultStore** (Database / Cache / Storage).
# MAF2.0 – Code & Folder Structure Review (Scalability)
## Current structure (summary)
| Layer | Location | Notes |
|-------|----------|--------|
| **API** | `apps/api/src/` | Bootstrap, gateway, routes, module registry |
| **Worker** | `apps/worker/src/` | Kafka consumers (pool, stream, DLQ), job handlers |
| **Shared** | `packages/shared/` | Types, config, logger, Kafka, ProducerService, ResultStore |
| **Domain modules** | `apps/api/src/modules/` | hello, auth (used by API) |
| **Root modules/** | `modules/` | auth (built), analytics/incidents/permits/notifications (empty) – **not in pnpm workspace** |
---
## What’s working well
1. **Single shared package** – One place for types, config, logger, Kafka. Reduces coupling and version drift.
2. **Module contract**`ModuleDefinition` (`name`, `register(app, ctx)`) is clear and extractable to a microservice later.
3. **Producer Service** – Single path for enqueue (`app.producerService.enqueue`) keeps broker usage consistent.
4. **Worker architecture** – Pool + stream processors + DLQ is scalable (more instances, same consumer groups).
5. **Config-driven modules**`config.modules[name].enabled` allows turning modules off without code change.
6. **Fastify typing**`fastify.d.ts` augments `FastifyInstance` with `kafka` and `producerService`.
---
## Scalability issues and recommendations
### 1. Duplicate / confusing module locations
- **Issue:** Domain modules live in `apps/api/src/modules/` (hello, auth). Root `modules/` has auth (with dist) and empty folders; it’s outside the pnpm workspace and not loaded by the API.
- **Recommendation:** Treat **`apps/api/src/modules/`** as the single source of truth. Remove or repurpose root `modules/` (e.g. delete empty ones, or move `modules/auth` into `apps/api/src/modules/` if it’s meant to be the same and remove the duplicate).
### 2. Manual module registry
- **Issue:** Adding a module requires editing `registry.ts` (import + add to array) and `config.ts` (add to `config.modules`). Easy to forget one.
- **Recommendation:** Use **convention-based discovery**: e.g. all folders under `src/modules/` that export a `ModuleDefinition` are loaded. Config can stay as-is for `enabled`/`basePath`, or be derived from a single manifest.
### 3. Job type handling in multiple places
- **Issue:** New job types require changes in: `packages/shared/src/types.ts`, `apps/api/src/routes/jobs.ts` (enqueue body parsing), `apps/worker/src/handlers/index.ts` (dispatch), and optionally `stream-processor.ts` for stream jobs.
- **Recommendation:** Introduce a **job registry**: one place that maps `jobType` → payload schema/validator and (on worker) handler. Enqueue route and worker dispatch both use this registry so adding a job type = one registration.
### 4. Route registration in bootstrap
- **Issue:** `index.ts` explicitly registers health, hello, jobs, metrics, then `registerModules()`. Fine for now, but as you add more global routes it gets noisy.
- **Recommendation:** Group core routes (health, metrics, jobs) in a small **routes registry** or `routes/index.ts` that `index.ts` calls once. Keeps bootstrap minimal and scales with more route groups.
### 5. ResultStore for horizontal scaling
- **Issue:** `createDefaultResultStore()` is in-memory; multiple API/worker instances don’t share state.
- **Recommendation:** Keep the interface; add a **Redis or DB-backed ResultStore** and choose it via config (e.g. `RESULT_STORE=redis`). No change to callers.
### 6. Deep health checks
- **Issue:** `GET /health?deep=true` has a placeholder; modules don’t register health checks.
- **Recommendation:** Extend `ModuleContext` or module contract with an optional `healthCheck?(): Promise<HealthCheckResult>`. Registry runs these when `deep=true` and merges into the response.
### 7. Unused packages (types, config, logger, kafka-client)
- **Issue:** `packages/` contains types, config, logger, kafka-client (with dist); apps use only `@mafgateway/shared`. Docs say you consolidated to one shared package.
- **Recommendation:** If you’re committed to a single shared package, remove or archive the unused packages to avoid confusion. If you prefer to split later, add them to the workspace and migrate shared to use them step by step.
### 8. Fastify instance augmentation
- **Issue:** As you add more shared services (DB, cache, etc.), `fastify.d.ts` will grow with many top-level props.
- **Recommendation:** Optional: add a single `app.services` (or `app.context`) object and augment that instead, so one place for “shared services” and fewer type edits.
---
## Recommended folder structure (scalable)
```
MAF2.0/
├── apps/
│ ├── api/
│ │ └── src/
│ │ ├── index.ts # Bootstrap only: load config, create app, register gateway, routes, modules
│ │ ├── gateway.ts
│ │ ├── routes/ # Global routes (health, metrics, jobs) + routes/index.ts that registers all
│ │ ├── modules/ # Single place for domain modules
│ │ │ ├── hello/
│ │ │ ├── auth/
│ │ │ ├── permits/ # Add when needed
│ │ │ └── ...
│ │ └── types/
│ └── worker/
│ └── src/
│ ├── index.ts
│ ├── handlers/ # One file per job type + registry (map jobType → handler)
│ ├── stream-processor.ts
│ └── dlq-processor.ts
├── packages/
│ └── shared/
│ └── src/
│ ├── types.ts # Include JobPayload union + job registry types
│ ├── config.ts
│ ├── logger.ts
│ ├── kafka.ts
│ ├── producer-service.ts
│ ├── result-store.ts
│ └── index.ts
├── docs/
├── package.json
└── pnpm-workspace.yaml
```
- **No root `modules/`** unless you explicitly use it (e.g. as a separate workspace for shared libraries); then add it to `pnpm-workspace.yaml`.
---
## Checklist for adding a new domain module (implemented)
1. Create `apps/api/src/modules/<name>/index.ts` with `{ name, register }` and `export default <name>Module`.
2. Add `<name>` to `MODULE_NAMES` in `apps/api/src/modules/manifest.ts`.
3. Add config for the module in `config.modules` in `packages/shared/src/config.ts`.
## Checklist for adding a new job type (implemented)
1. Add payload type and extend `JobPayload` in `packages/shared/src/types.ts`.
2. Register a builder in `apps/api/src/routes/job-registry.ts` via `registerJobBuilder('your.jobType', ...)`.
3. Add handler in `apps/worker/src/handlers/<name>.ts` and register in `handlers/index.ts` in the `handlers` map.
4. If it’s a stream job, add to `STREAM_JOB_TYPES` in `stream-processor.ts` and handle there (or reuse handler).
---
## Summary
- **Consolidate modules** in `apps/api/src/modules/` and remove or clarify root `modules/`.
- **Convention-based module discovery** (and optional config convention) to avoid editing registry for every new module.
- **Job registry** (map jobType → payload + handler) so new job types are added in one place.
- **Optional:** routes index, deep health from modules, Redis/DB ResultStore, single `app.services` for shared services.
These changes keep your current architecture and make it easier to add modules and job types without touching multiple files.
{
"name": "mafgateway-2026",
"version": "1.0.0",
"private": true,
"description": "MAFGateway 2026 - Scalable Fastify modular monolith with microservice-ready architecture",
"scripts": {
"build": "pnpm -r run build",
"dev": "pnpm run dev:api",
"dev:api": "pnpm --filter api run dev",
"dev:worker": "pnpm --filter worker run dev",
"dev:all": "concurrently \"pnpm run dev:api\" \"pnpm run dev:worker\"",
"start": "node apps/api/dist/index.js",
"start:worker": "node apps/worker/dist/index.js",
"test": "pnpm -r run test",
"lint": "pnpm -r run lint",
"typecheck": "pnpm -r run typecheck",
"clean": "pnpm -r run clean && rm -rf node_modules"
},
"engines": {
"node": ">=20.0.0"
},
"packageManager": "pnpm@9.14.2",
"devDependencies": {
"concurrently": "^9.1.0",
"typescript": "^5.7.2"
}
}
{
"name": "@mafgateway/shared",
"version": "1.0.0",
"private": true,
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": { "types": "./dist/index.d.ts", "import": "./dist/index.js", "require": "./dist/index.js" }
},
"scripts": {
"build": "tsc",
"clean": "rm -rf dist",
"typecheck": "tsc --noEmit",
"lint": "echo 'lint shared'"
},
"dependencies": {
"kafkajs": "^2.2.4",
"pino": "^9.5.0",
"pino-pretty": "^13.0.0"
},
"devDependencies": {
"@types/node": "^22.10.0",
"typescript": "^5.7.2"
}
}
/**
* Message Broker Layer – topic/channel definitions for Queue and Stream.
* All producers and consumers use these channels.
*/
export const BROKER_CHANNELS = {
/** Main job queue – Producer Service writes here; Worker Pool and Stream Processors read. */
JOBS_QUEUE: 'mafgateway.jobs',
/** Dead Letter Queue – failed jobs after max retries. DLQ Processor reads. */
DLQ: 'mafgateway.jobs.dlq',
} as const;
export type BrokerChannel = (typeof BROKER_CHANNELS)[keyof typeof BROKER_CHANNELS];
/** Centralized config from env */
import type { ModuleConfig } from './types.js';
export interface AppConfig {
env: 'development' | 'staging' | 'production' | 'test';
app: { name: string; port: number; host: string };
kafka: {
brokers: string[];
clientId: string;
consumerGroup: string;
streamProcessorGroup: string;
dlqProcessorGroup: string;
topics: { jobs: string; dlq: string };
/** Azure Event Hubs (Kafka): set SASL/SSL env vars */
sasl?: { mechanism: 'plain'; username: string; password: string };
ssl?: boolean;
};
modules: Record<string, ModuleConfig>;
observability: { logLevel: string; enableTracing: boolean; metricsPort?: number };
}
function getEnv(key: string, defaultValue?: string): string {
const value = process.env[key] ?? defaultValue;
if (value === undefined) throw new Error(`Missing required env: ${key}`);
return value;
}
function getEnvNumber(key: string, defaultValue: number): number {
const raw = process.env[key];
if (raw === undefined || raw === '') return defaultValue;
const n = Number(raw);
if (Number.isNaN(n)) throw new Error(`Invalid number for env: ${key}`);
return n;
}
export function loadConfig(): AppConfig {
const env = (process.env.NODE_ENV ?? 'development') as AppConfig['env'];
return {
env,
app: {
name: getEnv('APP_NAME', 'MAFGateway-2026'),
port: getEnvNumber('PORT', 3000),
host: getEnv('HOST', '0.0.0.0'),
},
kafka: {
brokers: getEnv('KAFKA_BROKERS', 'localhost:9092').split(',').map((b) => b.trim()),
clientId: getEnv('KAFKA_CLIENT_ID', 'mafgateway-2026'),
consumerGroup: getEnv('KAFKA_CONSUMER_GROUP', 'mafgateway-workers'),
streamProcessorGroup: getEnv('KAFKA_STREAM_GROUP', 'mafgateway-stream-processors'),
dlqProcessorGroup: getEnv('KAFKA_DLQ_GROUP', 'mafgateway-dlq-processors'),
topics: {
jobs: getEnv('KAFKA_TOPIC_JOBS', 'mafgateway.jobs'),
dlq: getEnv('KAFKA_TOPIC_DLQ', 'mafgateway.jobs.dlq'),
},
...(process.env.KAFKA_SASL_PASSWORD
? {
sasl: {
mechanism: 'plain' as const,
username: process.env.KAFKA_SASL_USERNAME ?? '$ConnectionString',
password: process.env.KAFKA_SASL_PASSWORD,
},
ssl: true,
}
: {}),
},
modules: {
hello: { name: 'hello', enabled: true, basePath: '/api/v1/hello' },
auth: { name: 'auth', enabled: true, basePath: '/api/v1/auth' },
permits: { name: 'permits', enabled: true, basePath: '/api/v1/permits' },
incidents: { name: 'incidents', enabled: true, basePath: '/api/v1/incidents' },
analytics: { name: 'analytics', enabled: true, basePath: '/api/v1/analytics' },
notifications: { name: 'notifications', enabled: true, basePath: '/api/v1/notifications' },
},
observability: {
logLevel: getEnv('LOG_LEVEL', env === 'production' ? 'info' : 'debug'),
enableTracing: process.env.ENABLE_TRACING === 'true',
metricsPort: getEnvNumber('METRICS_PORT', 9090),
},
};
}
/** @mafgateway/shared – types, config, logger, Kafka, Producer Service, Result Store */
export * from './types.js';
export * from './config.js';
export * from './logger.js';
export * from './broker.js';
export { createProducerService, type ProducerService } from './producer-service.js';
export {
createDefaultResultStore,
InMemoryResultStore,
FileResultStore,
type ResultStore,
type JobResult,
} from './result-store.js';
export type { Logger } from 'pino';
export { createKafkaJobClient, KafkaJobClient, type KafkaClientConfig } from './kafka.js';
export { createQueueConsumer, type QueueConsumer, type QueueConsumerConfig } from './queue-consumer.js';
/** Kafka producer/consumer with retry and DLQ */
import { Kafka, type Producer, type Consumer, type EachMessagePayload } from 'kafkajs';
import type { JobPayload } from './types.js';
import { childLogger } from './logger.js';
const logger = childLogger({ module: 'kafka' });
const MAX_RETRIES = 5;
const RETRY_BACKOFF_MS = 1000;
export interface KafkaClientConfig {
brokers: string[];
clientId: string;
consumerGroup?: string;
topics: { jobs: string; dlq: string };
sasl?: { mechanism: 'plain'; username: string; password: string };
ssl?: boolean;
}
export class KafkaJobClient {
private kafka: Kafka;
private producer: Producer | null = null;
private consumer: Consumer | null = null;
private config: KafkaClientConfig;
private messageHandler: ((payload: JobPayload) => Promise<void>) | null = null;
constructor(config: KafkaClientConfig) {
this.config = config;
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
ssl: config.ssl ?? false,
sasl: config.sasl,
retry: { retries: 5, initialRetryTime: 300, maxRetryTime: 30000 },
});
}
async connect(): Promise<void> {
this.producer = this.kafka.producer();
await this.producer.connect();
logger.info('Kafka producer connected');
}
async disconnect(): Promise<void> {
if (this.producer) await this.producer.disconnect();
if (this.consumer) await this.consumer.disconnect();
this.producer = null;
this.consumer = null;
logger.info('Kafka client disconnected');
}
async publishJob(job: JobPayload, key?: string): Promise<void> {
if (!this.producer) throw new Error('Producer not connected');
await this.producer.send({
topic: this.config.topics.jobs,
messages: [{
key: key ?? job.jobId,
value: JSON.stringify(job),
headers: { 'content-type': 'application/json', 'job-type': job.jobType, ...(job.correlationId ? { 'correlation-id': job.correlationId } : {}) },
}],
});
logger.info({ jobId: job.jobId, jobType: job.jobType }, 'Job published');
}
async publishToDLQ(job: JobPayload, error: Error): Promise<void> {
if (!this.producer) throw new Error('Producer not connected');
await this.producer.send({
topic: this.config.topics.dlq,
messages: [{ key: job.jobId, value: JSON.stringify({ ...job, dlqReason: error.message, dlqAt: new Date().toISOString() }) }],
});
logger.warn({ jobId: job.jobId, error: error.message }, 'Job sent to DLQ');
}
async startConsumer(groupId: string, handler: (payload: JobPayload) => Promise<void>): Promise<void> {
this.messageHandler = handler;
this.consumer = this.kafka.consumer({ groupId });
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.config.topics.jobs, fromBeginning: false });
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
const value = payload.message.value?.toString();
if (!value || !this.messageHandler) return;
let job: JobPayload;
try {
job = JSON.parse(value) as JobPayload;
} catch {
logger.error({ topic: payload.topic }, 'Invalid job JSON');
return;
}
const retryCount = (job.retryCount ?? 0) + 1;
try {
await this.messageHandler(job);
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
if (retryCount >= MAX_RETRIES) await this.publishToDLQ({ ...job, retryCount }, err);
else {
const backoff = RETRY_BACKOFF_MS * Math.pow(2, retryCount);
logger.warn({ jobId: job.jobId, retryCount, backoffMs: backoff }, 'Job failed, will retry');
await new Promise((r) => setTimeout(r, backoff));
await this.publishJob({ ...job, retryCount });
}
}
},
});
logger.info({ groupId, topic: this.config.topics.jobs }, 'Consumer started');
}
}
export function createKafkaJobClient(config: KafkaClientConfig): KafkaJobClient {
return new KafkaJobClient(config);
}
/** Structured logging */
import pino, { type LoggerOptions, type Logger } from 'pino';
export interface LoggerContext {
requestId?: string;
module?: string;
correlationId?: string;
[key: string]: unknown;
}
const defaultOptions: LoggerOptions = {
level: process.env.LOG_LEVEL ?? 'info',
formatters: { level: (label) => ({ level: label }), bindings: () => ({}) },
timestamp: pino.stdTimeFunctions.isoTime,
...(process.env.NODE_ENV !== 'production'
? { transport: { target: 'pino-pretty', options: { colorize: true, translateTime: 'SYS:standard' } } }
: {}),
};
let rootLogger: Logger | null = null;
export function createLogger(options: Partial<LoggerOptions> = {}): Logger {
return pino({ ...defaultOptions, ...options });
}
export function getLogger(): Logger {
if (!rootLogger) rootLogger = createLogger();
return rootLogger;
}
export function setLogger(logger: Logger): void {
rootLogger = logger;
}
export function childLogger(context: LoggerContext): Logger {
return getLogger().child(context);
}
/**
* Producer Service Layer – single entry point for enqueueing jobs to the message broker.
* Used by API Gateway; never produces directly from route handlers.
*/
import type { JobPayload } from './types.js';
import type { KafkaJobClient } from './kafka.js';
import { childLogger } from './logger.js';
const logger = childLogger({ module: 'producer-service' });
export interface ProducerService {
enqueue(job: JobPayload, key?: string): Promise<void>;
}
export function createProducerService(client: KafkaJobClient): ProducerService {
return {
async enqueue(job: JobPayload, key?: string): Promise<void> {
await client.publishJob(job, key);
logger.debug({ jobId: job.jobId, jobType: job.jobType }, 'ProducerService: job enqueued');
},
};
}
/**
* Standalone queue consumer – for Stream Processors and DLQ Processor.
* Consumes from a topic without producer/retry (consumer-only).
*/
import { Kafka } from 'kafkajs';
import type { KafkaClientConfig } from './kafka.js';
import { childLogger } from './logger.js';
const logger = childLogger({ module: 'queue-consumer' });
export interface QueueConsumerConfig extends Pick<KafkaClientConfig, 'brokers' | 'sasl' | 'ssl'> {
clientId: string;
}
export interface QueueConsumer {
start(): Promise<void>;
disconnect(): Promise<void>;
}
/**
* Create a consumer that reads from a topic and calls onMessage for each record.
* Used by Stream Processors and DLQ Processor.
*/
export function createQueueConsumer(
config: QueueConsumerConfig,
groupId: string,
topic: string,
onMessage: (payload: { key: string | null; value: string; headers?: Record<string, string> }) => Promise<void>
): QueueConsumer {
const kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
ssl: config.ssl ?? false,
sasl: config.sasl,
retry: { retries: 3, initialRetryTime: 300, maxRetryTime: 10000 },
});
const consumer = kafka.consumer({ groupId });
return {
async start(): Promise<void> {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const value = message.value?.toString();
if (!value) return;
try {
await onMessage({
key: message.key?.toString() ?? null,
value,
headers: message.headers ? Object.fromEntries(
Object.entries(message.headers).map(([k, v]) => [k, v?.toString() ?? ''])
) : undefined,
});
} catch (err) {
logger.error({ err, topic, key: message.key?.toString() }, 'QueueConsumer: message handler error');
}
},
});
logger.info({ groupId, topic }, 'QueueConsumer started');
},
async disconnect(): Promise<void> {
await consumer.disconnect();
logger.info({ groupId, topic }, 'QueueConsumer disconnected');
},
};
}
/**
* Result Store – sink for Worker Pool and Stream Processors.
* Persist job results (Database / Cache / Storage). Implementations: in-memory, file, or DB.
*/
import type { JobPayload } from './types.js';
import { childLogger } from './logger.js';
const logger = childLogger({ module: 'result-store' });
export interface JobResult {
jobId: string;
jobType: string;
status: 'completed' | 'failed';
result?: unknown;
error?: string;
completedAt: string;
}
export interface ResultStore {
save(jobId: string, payload: JobPayload, result: JobResult): Promise<void>;
get?(jobId: string): Promise<JobResult | null>;
}
/** In-memory store (for dev / single instance). Replace with Redis/DB for production. */
export class InMemoryResultStore implements ResultStore {
private store = new Map<string, JobResult>();
async save(_jobId: string, _payload: JobPayload, result: JobResult): Promise<void> {
this.store.set(result.jobId, result);
logger.debug({ jobId: result.jobId, status: result.status }, 'ResultStore: saved');
}
async get(jobId: string): Promise<JobResult | null> {
return this.store.get(jobId) ?? null;
}
}
/** File-based store – writes one JSON file per job under a directory (e.g. for DLQ inspection). */
export class FileResultStore implements ResultStore {
constructor(private readonly baseDir: string) {}
async save(_jobId: string, _payload: JobPayload, result: JobResult): Promise<void> {
const path = await import('node:path');
const fs = await import('node:fs/promises');
const file = path.join(this.baseDir, `${result.jobId}.json`);
await fs.mkdir(this.baseDir, { recursive: true });
await fs.writeFile(file, JSON.stringify(result, null, 2), 'utf-8');
logger.debug({ jobId: result.jobId, file }, 'ResultStore: saved to file');
}
}
export function createDefaultResultStore(): ResultStore {
return new InMemoryResultStore();
}
/** Shared types and contracts – used by API and Worker */
export interface ApiResponse<T = unknown> {
success: boolean;
data?: T;
error?: { code: string; message: string };
meta?: { requestId?: string; timestamp?: string };
}
export interface PaginatedResponse<T> extends ApiResponse<T[]> {
meta: ApiResponse['meta'] & { page: number; limit: number; total: number; totalPages: number };
}
export type JobType =
| 'hello.demo'
| 'report.generate'
| 'analytics.aggregate'
| 'file.process'
| 'notification.send'
| 'incident.sync';
export interface BaseJobPayload {
jobId: string;
jobType: JobType;
correlationId?: string;
createdAt: string;
retryCount?: number;
metadata?: Record<string, unknown>;
}
export interface HelloDemoPayload extends BaseJobPayload {
jobType: 'hello.demo';
message?: string;
requestId?: string;
}
export interface ReportGeneratePayload extends BaseJobPayload {
jobType: 'report.generate';
reportType: string;
params: Record<string, unknown>;
userId?: string;
}
export interface AnalyticsAggregatePayload extends BaseJobPayload {
jobType: 'analytics.aggregate';
period: string;
dimensions?: string[];
}
export interface FileProcessPayload extends BaseJobPayload {
jobType: 'file.process';
fileKey: string;
operation: 'validate' | 'transform' | 'import';
}
export type JobPayload =
| HelloDemoPayload
| ReportGeneratePayload
| AnalyticsAggregatePayload
| FileProcessPayload
| (BaseJobPayload & { jobType: 'notification.send' | 'incident.sync'; [k: string]: unknown });
export interface ModuleConfig {
name: string;
enabled: boolean;
basePath?: string;
[key: string]: unknown;
}
export interface HealthCheckResult {
status: 'ok' | 'degraded' | 'down';
module: string;
latencyMs?: number;
details?: Record<string, unknown>;
}
export interface ModuleContext {
basePath: string;
config: ModuleConfig;
}
export interface ModuleDefinition {
name: string;
register: (app: unknown, ctx: ModuleContext) => Promise<void>;
}
{
"extends": "../../tsconfig.base.json",
"compilerOptions": { "outDir": "./dist", "rootDir": "./src", "types": ["node"] },
"include": ["src/**/*"]
}
This diff is collapsed. Click to expand it.
packages:
- "apps/*"
- "packages/*"
#!/usr/bin/env bash
# Start Zookeeper and Kafka (host install under .kafka/)
set -e
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
KAFKA_DIR="$ROOT/.kafka/kafka"
if [[ ! -d "$KAFKA_DIR" ]]; then
echo "Kafka not found at $KAFKA_DIR. Run docs/INSTALL-KAFKA.md Option 3 or install manually."
exit 1
fi
cd "$KAFKA_DIR"
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > "$ROOT/.kafka/zookeeper.log" 2>&1 &
echo $! > "$ROOT/.kafka/zookeeper.pid"
sleep 3
nohup bin/kafka-server-start.sh config/server.properties > "$ROOT/.kafka/kafka.log" 2>&1 &
echo $! > "$ROOT/.kafka/kafka.pid"
echo "Zookeeper and Kafka started. Logs: .kafka/zookeeper.log, .kafka/kafka.log"
echo "Broker: localhost:9092"
#!/usr/bin/env bash
# Stop Kafka then Zookeeper (host install)
set -e
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
PID_K="$ROOT/.kafka/kafka.pid"
PID_Z="$ROOT/.kafka/zookeeper.pid"
for label in kafka zookeeper; do
if [[ "$label" == kafka ]]; then f="$PID_K"; else f="$PID_Z"; fi
if [[ -f "$f" ]]; then
pid=$(cat "$f")
if kill -0 "$pid" 2>/dev/null; then
kill "$pid" 2>/dev/null || true
echo "Stopped $label (PID $pid)"
fi
rm -f "$f"
fi
done
# Also kill any leftover java processes for kafka/zookeeper in this project
pkill -f "kafka.Kafka" 2>/dev/null || true
pkill -f "QuorumPeerMain" 2>/dev/null || true
echo "Kafka and Zookeeper stopped."
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"lib": ["ES2022"],
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true,
"isolatedModules": true,
"outDir": "./dist",
"rootDir": "."
},
"exclude": ["node_modules", "dist"]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment