- Published on
DEV-HTL-03 — GATEWAY TO SERVER DEVELOPMENT GUIDE
- Authors
DEV-HTL-03 — GATEWAY TO SERVER DEVELOPMENT GUIDE
- DEV-HTL-03 — GATEWAY TO SERVER DEVELOPMENT GUIDE
- 6. Software Architecture
- 7. Coding Architecture (3-layer OOP)
- 8. Communication Binding (FINAL LOCK)
- 9. Lifecycle Model
- 10. Failure Handling Implementation
- 11. Security Implementation
- 12. Testing Hook (HTL-09 Reference)
- 13. Definition of Done
- 14. Revision History
1. Purpose
Menjadi panduan implementasi final komunikasi Gateway (ESP32) ↔ Server (Raspberry Pi, bare-metal) untuk HortiLink, termasuk:
- MQTT bridge implementation (gateway publish/subscribe)
- Topic mapping konsisten (binding HTL-01)
- Store-and-forward buffering saat broker/LAN down
- Ingestion service di Pi yang menulis ke SQLite
- Command dispatch & ACK return path
- Config propagation & OTA metadata relay
Output yang diharapkan: artefak eksekusi riil untuk tim Gateway+Backend.
2. Scope
✔ 2.1 In-Scope
- MQTT bridge implementation (ESP32 → Mosquitto)
- Topic mapping (site namespace + node namespace)
- Store-and-forward buffering (gateway side)
- DB ingestion service (Pi) → SQLite
- Command dispatch (Pi → gateway → node path di DEV-HTL-02 boundary)
- Config propagation (Pi publish config version)
- OTA metadata relay (Pi publish metadata, gateway forward)
✔ 2.2 Out-of-Scope
- Node direct mode (DEV-HTL-01)
- Radio link Node↔Gateway (DEV-HTL-02)
- Cloud sync (DEV-HTL-04)
3. Reference (HTL Binding)
- HTL-01 – MQTT contract (topic/payload/command-ack semantics)
- HTL-03 – Gateway spec (routing validation, buffering, health publish)
- HTL-04 – Server spec (broker config, ingestion, RBAC, OTA repo)
- HTL-07 – MQTT security (ACL, credential storage, TLS optional LAN)
- HTL-08 – Broker failure modes & recovery expectations
- HTL-09 – Integration test + failure injection test
4. Hardware Selection & Economic Analysis
✔ 4.1 Gateway MCU Comparison
✔# Option A — ESP32 Gateway (dipilih)
Benefit
- BOM rendah (sudah dipakai node & gateway family)
- Konsumsi daya rendah
- Cukup untuk: ESP-NOW/LoRa + WiFi + MQTT client + buffering RAM
Risiko
- RAM terbatas → buffer harus bounded
- WiFi bisa noisy → reconnect logic wajib deterministik
Ekonomi
- Biaya gateway bisa setara node kelas atas
- Tidak perlu SBC tambahan di titik gateway
✔# Option B — Raspberry Pi sebagai “gateway” langsung (tidak dipilih untuk baseline)
Benefit
- Resource besar → buffering & bridging mudah
- Bisa pakai Ethernet/WiFi dual
Risiko
- Konsumsi daya tinggi
- Boot time lama
- SD corruption risk jika power buruk
- Biaya per-site naik karena Pi jadi dua fungsi (gateway+server) atau butuh Pi tambahan
Ekonomi
- Lebih mahal, cocok jika site besar sekali atau kebutuhan RF kompleks.
Keputusan: ESP32 gateway tetap dipakai (konsisten dengan HTL-03, DEV-HTL-02).
✔ 4.2 Raspberry Pi Model Comparison (Server per-site)
Kebutuhan baseline per-site (10–15 node, SQLite ingestion, Mosquitto):
- CPU: cukup untuk broker + python ingestion + dashboard ringan
- RAM: 2GB cukup (4GB lebih aman)
- Storage: disarankan SSD USB (lebih tahan daripada SD)
✔# Option A — Pi 3
Benefit
- Lebih murah
- Cukup untuk Mosquitto + SQLite jika beban rendah
Risiko
- CPU margin kecil saat ada dashboard/OTA copy
- I/O terbatas, SSD USB performa lebih rendah
✔# Option B — Pi 4 (dipilih)
Benefit
- CPU/RAM lebih kuat
- I/O lebih baik (USB3) → SSD lebih stabil
- Margin besar untuk growth (alarm, grafana sederhana, dll)
Risiko
- Harga lebih tinggi
Keputusan: Pi 4 direkomendasikan baseline, RAM 2GB minimal, 4GB preferred.
✔ 4.3 Storage (SD vs SSD) — Economic & Reliability
✔# SD Card (tidak direkomendasikan untuk write-heavy SQLite)
Benefit
- Murah
- Simple
Risiko
- Wear-out
- Corruption saat power loss
- Banyak write dari telemetry
✔# SSD USB (dipilih)
Benefit
- Lebih tahan write
- Lebih stabil
- Recovery lebih mudah
Ekonomi
- Biaya naik, tapi mengurangi risiko downtime/maintenance.
Keputusan: SSD USB recommended (minimal 64GB). SD hanya untuk OS boot jika diperlukan.
5. Electrical Integration Overview
Fokus DEV-HTL-03: power & koneksi LAN untuk gateway dan Pi, bukan panel aktuator.
✔ 5.1 Functional Integration Diagram (wajib)
+-------------------+ WiFi/LAN +----------------------+
| Gateway (ESP32) | MQTT over TCP (LAN) | Raspberry Pi Server |
| - MQTT Client |------------------------>| - Mosquitto Broker |
| - Buffer RAM |<------------------------| - Ingestion Service |
| - Health Pub | Commands/Config/OTA | - SQLite (WAL mode) |
+-------------------+ +----------------------+
| |
| (DEV-HTL-02 boundary) | (Optional LAN HMI)
v v
Node Field Network Smartphone/PC
✔ 5.2 Power Isolation (Gateway vs Pi)
Gateway supply bisa sharing dengan field PSU, tetapi:
- wajib buck converter yang stabil
- ripple rendah (ESP32 WiFi sensitif)
Pi idealnya supply terpisah atau UPS kecil.
✔ 5.3 LAN Wiring & Physical Placement
- Pi ditempatkan dekat router/switch lokal
- Gateway ditempatkan dekat area node untuk mengurangi hop relay
- Jika router jauh → gunakan extender/switch
✔ 5.4 UPS Consideration
Pi: UPS recommended (mini UPS 5V) untuk mencegah SD/SSD corruption. Gateway: optional, tapi power drop harus masuk failure matrix.
✔ 5.5 EMI Considerations
- Jauhkan gateway dari relay coil/contactor/pump drive
- Gunakan enclosure dengan shielding sederhana jika dekat motor
- Grounding & surge suppression binding HTL-06 (overview saja)
6. Software Architecture
6.1 Gateway Modules (ESP32)
+--------------------------------------------------+
| GATEWAY (ESP32) |
| |
| [DEV-HTL-02 boundary] |
| - RadioReceiver (from field link) |
| - FrameParser (binary->struct) |
| |
| +-------------------+ +------------------+ |
| | srv_topic_mapper |----->| drv_mqtt_client | |
| +-------------------+ +------------------+ |
| | | |
| v v |
| +-------------+ +-------------+ |
| | srv_buffer |<----------| drv_wifi | |
| +-------------+ +-------------+ |
| | | |
| v v |
| +-------------------------------+ |
| | app_gateway_bridge (orchestr) | |
| +-------------------------------+ |
| |
| v
| srv_health_publisher (gateway health)
+--------------------------------------------------+
Kontrak internal:
app_gateway_bridgemenerima frame dari DEV-HTL-02 (binary), mengubah jadi payload MQTT (JSON/string) dan publish.- Jika MQTT down →
srv_buffermenyimpan message (RAM FIFO bounded). - Saat reconnect → flush FIFO.
6.2 Server Modules (Raspberry Pi)
+--------------------------------------------------------+
| RASPBERRY PI (bare-metal) |
| |
| +--------------------+ +----------------------+ |
| | Mosquitto Broker |<---->| Ingestion Service | |
| +--------------------+ | (paho-mqtt) | |
| +----------+----------+ |
| | |
| v |
| +----------------------+ |
| | SQLite Repository | |
| | (WAL mode) | |
| +----------------------+ |
| |
| +--------------------+ |
| | Command Dispatcher |--> publish command topics |
| +--------------------+ |
+--------------------------------------------------------+
DB engine: SQLite (locked oleh Anda). Ingestion: Python (paho-mqtt). Deployment: bare-metal (systemd akan di Part 4 bila perlu).
7. Coding Architecture (3-layer OOP)
7.1 Gateway (ESP32): App → Service → Driver (+ sys)
Prefix wajib:
app_Applicationsrv_Servicedrv_Driversys_System
✔ 7.1.1 Flat Folder (Gateway)
dev-htl-03-gateway/
gateway.ino
sys_time.h
sys_log.h
sys_log.cpp
drv_wifi.h
drv_wifi.cpp
drv_mqtt_client.h
drv_mqtt_client.cpp
srv_ring_buffer.h
srv_ring_buffer.cpp
srv_topic_mapper.h
srv_topic_mapper.cpp
srv_health_publisher.h
srv_health_publisher.cpp
app_gateway_bridge.h
app_gateway_bridge.cpp
7.2 Backend (Pi): Controller → Service → Repository
Struktur role:
- Controller: MQTT handler (on_message)
- Service: mapping topic → entity + validation
- Repository: SQLite write/read
✔ 7.2.1 Folder (Server)
dev-htl-03-server/
config.yaml
schema.sql
repository_sqlite.py
service_ingestion.py
controller_mqtt_ingestion.py
service_command_dispatcher.py
cli_publish_command.py
A) FULL CODING — GATEWAY (ESP32, Arduino)
- A.1
sys_time.h
#pragma once
#include <Arduino.h>
static inline uint32_t sys_time_ms() { return millis(); }
- A.2
sys_log.h/sys_log.cpp
// sys_log.h
#pragma once
#include <Arduino.h>
class sys_log {
public:
void add(const String& s) { Serial.println(s); }
};
extern sys_log SYS_LOG;
// sys_log.cpp
#include "sys_log.h"
sys_log SYS_LOG;
- A.3
drv_wifi.h/drv_wifi.cpp
// drv_wifi.h
#pragma once
#include <Arduino.h>
class drv_wifi {
public:
void init(const char* ssid, const char* pass);
void loop();
bool connected() const;
int rssi() const;
private:
const char* ssid_{nullptr};
const char* pass_{nullptr};
uint32_t last_attempt_{0};
};
extern drv_wifi DRV_WIFI;
// drv_wifi.cpp
#include "drv_wifi.h"
#include <WiFi.h>
drv_wifi DRV_WIFI;
void drv_wifi::init(const char* ssid, const char* pass) {
ssid_ = ssid; pass_ = pass;
WiFi.mode(WIFI_STA);
WiFi.setSleep(false);
WiFi.begin(ssid_, pass_);
last_attempt_ = millis();
}
void drv_wifi::loop() {
if (WiFi.status() == WL_CONNECTED) return;
if (millis() - last_attempt_ < 5000) return;
last_attempt_ = millis();
WiFi.disconnect();
WiFi.begin(ssid_, pass_);
}
bool drv_wifi::connected() const { return WiFi.status() == WL_CONNECTED; }
int drv_wifi::rssi() const { return WiFi.RSSI(); }
- A.4
drv_mqtt_client.h/drv_mqtt_client.cpp
Menggunakan PubSubClient.
// drv_mqtt_client.h
#pragma once
#include <Arduino.h>
#include <PubSubClient.h>
#include <WiFiClient.h>
typedef void (*drv_mqtt_msg_cb)(const char* topic, const uint8_t* payload, unsigned int len);
class drv_mqtt_client {
public:
void init(const char* host, uint16_t port, const char* user, const char* pass);
void set_callback(drv_mqtt_msg_cb cb);
void loop();
bool connected() const;
bool publish(const char* topic, const uint8_t* payload, size_t len, bool retain=false, int qos=0);
bool subscribe(const char* topic, int qos=0);
private:
WiFiClient net_;
PubSubClient cli_{net_};
const char* host_{nullptr};
uint16_t port_{1883};
const char* user_{nullptr};
const char* pass_{nullptr};
drv_mqtt_msg_cb cb_{nullptr};
uint32_t last_attempt_{0};
void ensure_connect_();
};
extern drv_mqtt_client DRV_MQTT;
// drv_mqtt_client.cpp
#include "drv_mqtt_client.h"
#include "sys_log.h"
#include "drv_wifi.h"
drv_mqtt_client DRV_MQTT;
static void _ps_cb(char* topic, uint8_t* payload, unsigned int len) {
// will be replaced by instance callback set_callback()
(void)topic; (void)payload; (void)len;
}
void drv_mqtt_client::init(const char* host, uint16_t port, const char* user, const char* pass) {
host_ = host; port_ = port; user_ = user; pass_ = pass;
cli_.setServer(host_, port_);
cli_.setCallback([](char* t, uint8_t* p, unsigned int l){
if (DRV_MQTT.cb_) DRV_MQTT.cb_(t, p, l);
});
}
void drv_mqtt_client::set_callback(drv_mqtt_msg_cb cb) { cb_ = cb; }
bool drv_mqtt_client::connected() const { return cli_.connected(); }
void drv_mqtt_client::ensure_connect_() {
if (cli_.connected()) return;
if (!DRV_WIFI.connected()) return;
if (millis() - last_attempt_ < 3000) return;
last_attempt_ = millis();
String client_id = "htl-gw-" + String((uint32_t)ESP.getEfuseMac(), HEX);
bool ok = false;
if (user_ && pass_) ok = cli_.connect(client_id.c_str(), user_, pass_);
else ok = cli_.connect(client_id.c_str());
if (ok) SYS_LOG.add("mqtt connected");
}
void drv_mqtt_client::loop() {
ensure_connect_();
if (cli_.connected()) cli_.loop();
}
bool drv_mqtt_client::publish(const char* topic, const uint8_t* payload, size_t len, bool retain, int qos) {
// PubSubClient QoS default 0; QoS>0 requires patched client.
// For now: qos ignored here; QoS will be locked in Part 3 and implemented if needed with a QoS1-capable lib.
(void)qos;
if (!cli_.connected()) return false;
return cli_.publish(topic, payload, len, retain);
}
bool drv_mqtt_client::subscribe(const char* topic, int qos) {
(void)qos;
if (!cli_.connected()) return false;
return cli_.subscribe(topic);
}
Catatan: QoS1 “real” pada Arduino sering butuh library/patch tertentu. Di Part 3 (Communication Binding) kita akan kunci QoS, dan jika QoS1 mandatory kita gunakan lib yang support atau implement ack layer (di service).
- A.5
srv_ring_buffer.h/srv_ring_buffer.cpp
Store-and-forward bounded (RAM), FIFO.
// srv_ring_buffer.h
#pragma once
#include <Arduino.h>
struct srv_buf_msg {
char topic[96];
uint16_t len;
uint8_t payload[220];
bool retain;
};
class srv_ring_buffer {
public:
void init(uint8_t capacity=50);
bool push(const char* topic, const uint8_t* payload, uint16_t len, bool retain=false);
bool pop(srv_buf_msg& out);
uint8_t size() const { return size_; }
uint8_t capacity() const { return cap_; }
uint32_t dropped() const { return dropped_; }
private:
static const uint8_t MAX_CAP = 60;
srv_buf_msg q_[MAX_CAP];
uint8_t cap_{50};
uint8_t head_{0};
uint8_t tail_{0};
uint8_t size_{0};
uint32_t dropped_{0};
};
extern srv_ring_buffer SRV_BUF;
// srv_ring_buffer.cpp
#include "srv_ring_buffer.h"
#include <string.h>
srv_ring_buffer SRV_BUF;
void srv_ring_buffer::init(uint8_t capacity) {
cap_ = min<uint8_t>(capacity, MAX_CAP);
head_ = tail_ = size_ = 0;
dropped_ = 0;
}
bool srv_ring_buffer::push(const char* topic, const uint8_t* payload, uint16_t len, bool retain) {
if (len > sizeof(q_[0].payload)) return false;
if (size_ >= cap_) {
// drop oldest (FIFO)
tail_ = (tail_ + 1) % cap_;
size_--;
dropped_++;
}
srv_buf_msg &m = q_[head_];
memset(&m, 0, sizeof(m));
strncpy(m.topic, topic, sizeof(m.topic)-1);
m.len = len;
memcpy(m.payload, payload, len);
m.retain = retain;
head_ = (head_ + 1) % cap_;
size_++;
return true;
}
bool srv_ring_buffer::pop(srv_buf_msg& out) {
if (size_ == 0) return false;
out = q_[tail_];
tail_ = (tail_ + 1) % cap_;
size_--;
return true;
}
- A.6
srv_topic_mapper.h/srv_topic_mapper.cpp
Topic mapping (akan dikunci final di Part 3). Di sini disediakan builder konsisten.
// srv_topic_mapper.h
#pragma once
#include <Arduino.h>
class srv_topic_mapper {
public:
void set_site(const char* site_id);
// out must be >= 96
void telemetry(char* out, size_t n, uint32_t node_id) const;
void health(char* out, size_t n, uint32_t node_id) const;
void command(char* out, size_t n, uint32_t node_id) const;
void ack(char* out, size_t n, uint32_t node_id) const;
private:
char site_[16] = "site-01";
};
extern srv_topic_mapper SRV_TOPIC;
// srv_topic_mapper.cpp
#include "srv_topic_mapper.h"
#include <stdio.h>
#include <string.h>
srv_topic_mapper SRV_TOPIC;
void srv_topic_mapper::set_site(const char* site_id) {
strncpy(site_, site_id, sizeof(site_)-1);
}
void srv_topic_mapper::telemetry(char* out, size_t n, uint32_t node_id) const {
snprintf(out, n, "htl/%s/telemetry/%08lx", site_, (unsigned long)node_id);
}
void srv_topic_mapper::health(char* out, size_t n, uint32_t node_id) const {
snprintf(out, n, "htl/%s/health/%08lx", site_, (unsigned long)node_id);
}
void srv_topic_mapper::command(char* out, size_t n, uint32_t node_id) const {
snprintf(out, n, "htl/%s/command/%08lx", site_, (unsigned long)node_id);
}
void srv_topic_mapper::ack(char* out, size_t n, uint32_t node_id) const {
snprintf(out, n, "htl/%s/ack/%08lx", site_, (unsigned long)node_id);
}
- A.7
srv_health_publisher.h/srv_health_publisher.cpp
Gateway health to MQTT (uptime, wifi rssi, buffer depth, drops).
// srv_health_publisher.h
#pragma once
#include <Arduino.h>
class srv_health_publisher {
public:
void init(uint32_t interval_ms = 10000);
bool due(uint32_t now) const;
void tick(uint32_t now);
private:
uint32_t interval_{10000};
uint32_t next_{0};
};
extern srv_health_publisher SRV_GW_HEALTH;
// srv_health_publisher.cpp
#include "srv_health_publisher.h"
srv_health_publisher SRV_GW_HEALTH;
void srv_health_publisher::init(uint32_t interval_ms) {
interval_ = interval_ms;
next_ = 0;
}
bool srv_health_publisher::due(uint32_t now) const { return now >= next_; }
void srv_health_publisher::tick(uint32_t now) { next_ = now + interval_; }
- A.8
app_gateway_bridge.h/app_gateway_bridge.cpp
Ini orchestrator: publish frame yang datang dari DEV-HTL-02 boundary (di sini disimulasikan via push_telemetry()), buffer saat MQTT down, flush saat up, publish gateway health.
// app_gateway_bridge.h
#pragma once
#include <Arduino.h>
class app_gateway_bridge {
public:
void init(const char* site_id);
void loop();
// boundary from DEV-HTL-02 (radio->gateway): hand over already-decoded node_id + json payload
void push_telemetry(uint32_t node_id, const char* json);
void push_node_health(uint32_t node_id, const char* json);
private:
void flush_buffer_();
void publish_gateway_health_();
uint32_t last_flush_{0};
};
extern app_gateway_bridge APP_GW;
// app_gateway_bridge.cpp
#include "app_gateway_bridge.h"
#include "srv_ring_buffer.h"
#include "srv_topic_mapper.h"
#include "srv_health_publisher.h"
#include "drv_wifi.h"
#include "drv_mqtt_client.h"
#include "sys_time.h"
#include "sys_log.h"
#include <ArduinoJson.h>
app_gateway_bridge APP_GW;
void app_gateway_bridge::init(const char* site_id) {
SRV_TOPIC.set_site(site_id);
SRV_BUF.init(50);
SRV_GW_HEALTH.init(10000);
}
void app_gateway_bridge::push_telemetry(uint32_t node_id, const char* json) {
char topic[96];
SRV_TOPIC.telemetry(topic, sizeof(topic), node_id);
if (!DRV_MQTT.connected()) {
SRV_BUF.push(topic, (const uint8_t*)json, (uint16_t)strlen(json), false);
return;
}
if (!DRV_MQTT.publish(topic, (const uint8_t*)json, strlen(json), false, 0)) {
SRV_BUF.push(topic, (const uint8_t*)json, (uint16_t)strlen(json), false);
}
}
void app_gateway_bridge::push_node_health(uint32_t node_id, const char* json) {
char topic[96];
SRV_TOPIC.health(topic, sizeof(topic), node_id);
if (!DRV_MQTT.connected()) {
SRV_BUF.push(topic, (const uint8_t*)json, (uint16_t)strlen(json), false);
return;
}
if (!DRV_MQTT.publish(topic, (const uint8_t*)json, strlen(json), false, 0)) {
SRV_BUF.push(topic, (const uint8_t*)json, (uint16_t)strlen(json), false);
}
}
void app_gateway_bridge::flush_buffer_() {
if (!DRV_MQTT.connected()) return;
// flush max N per loop to avoid starvation
for (int i = 0; i < 10; i++) {
srv_buf_msg m;
if (!SRV_BUF.pop(m)) break;
if (!DRV_MQTT.publish(m.topic, m.payload, m.len, m.retain, 0)) {
// push back (drop-oldest policy already exists)
SRV_BUF.push(m.topic, m.payload, m.len, m.retain);
break;
}
}
}
void app_gateway_bridge::publish_gateway_health_() {
if (!DRV_MQTT.connected()) return;
StaticJsonDocument<256> doc;
doc["uptime_ms"] = (uint32_t)sys_time_ms();
doc["wifi_rssi"] = DRV_WIFI.rssi();
doc["buf_depth"] = SRV_BUF.size();
doc["buf_cap"] = SRV_BUF.capacity();
doc["buf_dropped"] = SRV_BUF.dropped();
char payload[256];
size_t n = serializeJson(doc, payload, sizeof(payload));
// topic: htl/<site>/health/gateway
char topic[96];
snprintf(topic, sizeof(topic), "htl/%s/health/gateway", "site-01"); // Part 3: lock to mapper
DRV_MQTT.publish(topic, (const uint8_t*)payload, n, false, 0);
}
void app_gateway_bridge::loop() {
uint32_t now = sys_time_ms();
// flush buffer periodically
if (now - last_flush_ >= 200) {
last_flush_ = now;
flush_buffer_();
}
// publish gateway health
if (SRV_GW_HEALTH.due(now)) {
SRV_GW_HEALTH.tick(now);
publish_gateway_health_();
}
}
- A.9
gateway.ino(Gateway main)
Simulasi data masuk dari DEV-HTL-02 (nanti diganti oleh radio handler).
#include <Arduino.h>
#include "sys_log.h"
#include "drv_wifi.h"
#include "drv_mqtt_client.h"
#include "app_gateway_bridge.h"
static const char* WIFI_SSID = "YOUR_SSID";
static const char* WIFI_PASS = "YOUR_PASS";
static const char* MQTT_HOST = "192.168.1.10"; // Pi IP
static const uint16_t MQTT_PORT = 1883;
static const char* MQTT_USER = "htl_gw";
static const char* MQTT_PASSWD = "htl_gw_pass";
static void on_mqtt_msg(const char* topic, const uint8_t* payload, unsigned int len) {
// Part 3: command/config topics will be locked and dispatched to DEV-HTL-02 boundary
(void)topic; (void)payload; (void)len;
}
void setup() {
Serial.begin(115200);
DRV_WIFI.init(WIFI_SSID, WIFI_PASS);
DRV_MQTT.init(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASSWD);
DRV_MQTT.set_callback(on_mqtt_msg);
APP_GW.init("site-01");
SYS_LOG.add("gateway booted");
}
void loop() {
DRV_WIFI.loop();
DRV_MQTT.loop();
APP_GW.loop();
// SIMULATED inbound telemetry from node (replace with DEV-HTL-02 integration)
static uint32_t last = 0;
if (millis() - last > 2000) {
last = millis();
APP_GW.push_telemetry(0x00000001, "{\"soil\":0.42,\"temp\":28.5}");
}
}
B) FULL CODING — SERVER (Pi, Python, SQLite)
- B.1
schema.sql
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS telemetry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts_unix INTEGER NOT NULL,
site_id TEXT NOT NULL,
node_id TEXT NOT NULL,
topic TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_telemetry_site_node_ts
ON telemetry(site_id, node_id, ts_unix);
CREATE TABLE IF NOT EXISTS health (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts_unix INTEGER NOT NULL,
site_id TEXT NOT NULL,
node_id TEXT NOT NULL,
topic TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_health_site_node_ts
ON health(site_id, node_id, ts_unix);
CREATE TABLE IF NOT EXISTS command_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts_unix INTEGER NOT NULL,
site_id TEXT NOT NULL,
node_id TEXT NOT NULL,
cmd_id TEXT,
topic TEXT NOT NULL,
payload_json TEXT NOT NULL,
status TEXT DEFAULT 'issued'
);
- B.2
repository_sqlite.py
import sqlite3
import time
from typing import Optional
class RepositorySqlite:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL;")
self.conn.execute("PRAGMA synchronous=NORMAL;")
self.conn.commit()
def init_schema(self, schema_sql: str):
self.conn.executescript(schema_sql)
self.conn.commit()
def insert_telemetry(self, site_id: str, node_id: str, topic: str, payload_json: str, ts_unix: Optional[int]=None):
ts = ts_unix or int(time.time())
self.conn.execute(
"INSERT INTO telemetry(ts_unix, site_id, node_id, topic, payload_json) VALUES(?,?,?,?,?)",
(ts, site_id, node_id, topic, payload_json)
)
self.conn.commit()
def insert_health(self, site_id: str, node_id: str, topic: str, payload_json: str, ts_unix: Optional[int]=None):
ts = ts_unix or int(time.time())
self.conn.execute(
"INSERT INTO health(ts_unix, site_id, node_id, topic, payload_json) VALUES(?,?,?,?,?)",
(ts, site_id, node_id, topic, payload_json)
)
self.conn.commit()
def insert_command_log(self, site_id: str, node_id: str, topic: str, payload_json: str, cmd_id: Optional[str]=None):
ts = int(time.time())
self.conn.execute(
"INSERT INTO command_log(ts_unix, site_id, node_id, cmd_id, topic, payload_json) VALUES(?,?,?,?,?,?)",
(ts, site_id, node_id, cmd_id, topic, payload_json)
)
self.conn.commit()
- B.3
service_ingestion.py
from repository_sqlite import RepositorySqlite
class IngestionService:
def __init__(self, repo: RepositorySqlite):
self.repo = repo
@staticmethod
def parse_topic(topic: str):
# expected: htl/<site_id>/<kind>/<node_id>
parts = topic.split("/")
if len(parts) < 4 or parts[0] != "htl":
return None
site_id = parts[1]
kind = parts[2] # telemetry | health | command | ack ...
node_id = parts[3]
return site_id, kind, node_id
def ingest(self, topic: str, payload: str):
parsed = self.parse_topic(topic)
if not parsed:
return
site_id, kind, node_id = parsed
if kind == "telemetry":
self.repo.insert_telemetry(site_id, node_id, topic, payload)
elif kind == "health":
self.repo.insert_health(site_id, node_id, topic, payload)
else:
# keep for later extensions
pass
- B.4
controller_mqtt_ingestion.py
import yaml
import time
import pathlib
import paho.mqtt.client as mqtt
from repository_sqlite import RepositorySqlite
from service_ingestion import IngestionService
def load_text(path: str) -> str:
return pathlib.Path(path).read_text(encoding="utf-8")
def main():
cfg = yaml.safe_load(load_text("config.yaml"))
repo = RepositorySqlite(cfg["sqlite_db"])
repo.init_schema(load_text("schema.sql"))
svc = IngestionService(repo)
client = mqtt.Client(client_id=cfg.get("client_id", "htl-ingestion"))
if cfg.get("mqtt_user"):
client.username_pw_set(cfg["mqtt_user"], cfg["mqtt_pass"])
def on_connect(c, userdata, flags, rc):
# subscribe topics; exact mapping locked in Part 3
c.subscribe("htl/+/telemetry/+")
c.subscribe("htl/+/health/+")
print("connected rc=", rc)
def on_message(c, userdata, msg):
payload = msg.payload.decode("utf-8", errors="replace")
svc.ingest(msg.topic, payload)
client.on_connect = on_connect
client.on_message = on_message
client.connect(cfg["mqtt_host"], cfg.get("mqtt_port", 1883), keepalive=60)
while True:
client.loop(timeout=1.0)
time.sleep(0.01)
if __name__ == "__main__":
main()
- B.5
service_command_dispatcher.py(minimal publish command)
import json
import time
import paho.mqtt.client as mqtt
from repository_sqlite import RepositorySqlite
class CommandDispatcher:
def __init__(self, mqtt_host: str, mqtt_port: int, user: str, password: str, repo: RepositorySqlite):
self.repo = repo
self.client = mqtt.Client(client_id="htl-cmd-dispatch")
if user:
self.client.username_pw_set(user, password)
self.client.connect(mqtt_host, mqtt_port, keepalive=60)
def publish_command(self, site_id: str, node_id: str, cmd: dict):
topic = f"htl/{site_id}/command/{node_id}"
payload = json.dumps(cmd, separators=(",", ":"))
self.client.publish(topic, payload, qos=0, retain=False)
self.repo.insert_command_log(site_id, node_id, topic, payload, cmd.get("cmd_id"))
- B.6
cli_publish_command.py(tool manual)
import argparse
import json
from repository_sqlite import RepositorySqlite
from service_command_dispatcher import CommandDispatcher
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=1883)
ap.add_argument("--user", default="")
ap.add_argument("--passw", default="")
ap.add_argument("--db", default="/opt/hortilink/hortilink.db")
ap.add_argument("--site", required=True)
ap.add_argument("--node", required=True)
ap.add_argument("--cmd", required=True, help='json string, ex {"cmd_id":"1","action":"pump_on","ttl":30}')
args = ap.parse_args()
repo = RepositorySqlite(args.db)
disp = CommandDispatcher(args.host, args.port, args.user, args.passw, repo)
disp.publish_command(args.site, args.node, json.loads(args.cmd))
if __name__ == "__main__":
main()
- B.7
config.yaml
mqtt_host: '127.0.0.1'
mqtt_port: 1883
mqtt_user: 'htl_srv'
mqtt_pass: 'htl_srv_pass'
client_id: 'htl-ingestion'
sqlite_db: '/opt/hortilink/hortilink.db'
8. Communication Binding (FINAL LOCK)
8.2 Topic Map (FINAL)
| Purpose | Topic Pattern | Publisher | Subscriber |
|---|---|---|---|
| Telemetry | htl/<site>/telemetry/<node_id> | Gateway | Server |
| Node Health | htl/<site>/health/<node_id> | Gateway | Server |
| Gateway Health | htl/<site>/health/gateway | Gateway | Server |
| Command | htl/<site>/command/<node_id> | Server | Gateway |
| ACK | htl/<site>/ack/<node_id> | Gateway | Server |
| Config | htl/<site>/config/<node_id> | Server | Gateway |
| OTA Metadata | htl/<site>/ota/<node_id> | Server | Gateway |
8.3 QoS Rules (LOCKED)
| Message Type | QoS | Retain |
|---|---|---|
| Telemetry | 0 | false |
| Node Health | 0 | false |
| Gateway Health | 0 | false |
| Command | 1 | false |
| ACK | 1 | false |
| Config | 1 | true |
| OTA Metadata | 1 | true |
Penjelasan:
- Telemetry boleh hilang (bounded retry di field).
- Command & ACK tidak boleh hilang → QoS1.
- Config & OTA harus retained agar node baru dapat versi terakhir.
8.4 MQTT Keepalive & Session Policy
Gateway:
- Clean session: true
- Keepalive: 60s
- Reconnect delay: 3s backoff
Server ingestion:
- Clean session: true
- QoS per subscription sesuai tabel
- Reconnect loop non-blocking
8.5 ACL Binding (Mosquitto)
File: /etc/mosquitto/acl_htl.conf
user htl_gw
topic read htl/+/command/+
topic read htl/+/config/+
topic read htl/+/ota/+
topic write htl/+/telemetry/+
topic write htl/+/health/+
topic write htl/+/ack/+
user htl_srv
topic readwrite htl/#
8.6 Mosquitto Config (Minimal)
File: /etc/mosquitto/conf.d/hortilink.conf
listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd_htl
acl_file /etc/mosquitto/acl_htl.conf
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 30
Password set via:
mosquitto_passwd -c /etc/mosquitto/passwd_htl htl_gw
mosquitto_passwd /etc/mosquitto/passwd_htl htl_srv
8.7 Update Gateway Code (QoS + Retain Alignment)
Karena PubSubClient default QoS0, untuk QoS1 kita perlu:
- Gunakan
publish(topic, payload)(QoS0) - Untuk command path (subscribe) kita tetap bisa terima QoS1
- Jika QoS1 publish wajib, gunakan library yang support QoS1 (misal AsyncMqttClient)
Untuk baseline sekarang:
Gateway side:
// subscribe command/config/ota
DRV_MQTT.subscribe("htl/site-01/command/+");
DRV_MQTT.subscribe("htl/site-01/config/+");
DRV_MQTT.subscribe("htl/site-01/ota/+");
Publish config/ota retained (if needed):
DRV_MQTT.publish(topic, payload, len, true /*retain*/, 0);
Catatan: QoS1 enforcement bisa dipindahkan ke broker level jika library terbatas.
9. Lifecycle Model
Mengunci bagaimana Gateway & Server beroperasi.
9.1 Gateway Boot Sequence
BOOT
↓
WiFi INIT
↓
WiFi CONNECT
↓
MQTT INIT
↓
MQTT CONNECT
↓
SUBSCRIBE COMMAND/CONFIG/OTA
↓
BRIDGE ACTIVE
9.2 Gateway Runtime Loop
Pseudo-flow (riil di loop()):
DRV_WIFI.loop();
DRV_MQTT.loop();
if (!DRV_MQTT.connected())
// buffer only
else
flush buffer
publish gateway health every 10s
9.3 Reconnect Logic (LOCK)
WiFi:
- Retry every 5s
MQTT:
- Retry every 3s
- Do not block main loop
- After reconnect → resubscribe
Implement in drv_mqtt_client::ensure_connect_() (sudah ada).
Tambahkan resubscribe after connect:
if (ok) {
cli_.subscribe("htl/site-01/command/+");
cli_.subscribe("htl/site-01/config/+");
cli_.subscribe("htl/site-01/ota/+");
}
9.4 Server Lifecycle
Server service (controller_mqtt_ingestion.py):
START
↓
CONNECT MQTT
↓
SUBSCRIBE telemetry + health
↓
WAIT LOOP
↓
WRITE SQLite
If broker down:
- client.loop() handles reconnect
- no blocking sleep >1s
9.5 End-to-End Command Flow
- CLI publish command →
htl/site-01/command/node01 - Gateway receive → forward to DEV-HTL-02 boundary
- Node execute → send ACK (radio)
- Gateway publish ACK →
htl/site-01/ack/node01 - Server receive → mark command_log
9.6 Buffer Flush Policy (LOCK)
- Max flush 10 message per loop
- If publish fail → push back
- Drop oldest if buffer full
- Buffer capacity: 50 message
9.7 Degraded Mode
If:
- WiFi down
- MQTT down
Then:
- Only buffer
- Publish health locally only
- Do not crash
- Recover automatically
10. Failure Handling Implementation
Format wajib: Detection → Impact → Recovery
✔ 10.1 Broker Crash (Mosquitto stop/restart)
Detection (Gateway)
DRV_MQTT.connected() == false- publish return false
Impact
- Telemetry tidak sampai ke server
- Command dari server tidak diterima gateway
- ACK tidak terkirim ke server
Recovery (Gateway)
- Semua outgoing message masuk
SRV_BUF(store-and-forward) drv_mqtt_client.ensure_connect_()retry tiap 3s- Saat reconnect: resubscribe
command/+,config/+,ota/+ - Flush FIFO: max 10 message per loop
Detection (Server Ingestion)
- MQTT client disconnect /
on_disconnect
Recovery (Server)
- Auto reconnect (paho)
- Subscription ulang pada
on_connect
✔ 10.2 Pi Reboot / Power Loss
Detection
- MQTT disconnect di gateway
pingfail (opsional)
Impact
- sama seperti broker crash, plus ingestion downtime
Recovery
- Gateway buffer RAM bertahan selama gateway tidak reboot
- Jika Pi hidup lagi: gateway reconnect + flush
- Kebijakan: jika gateway reboot juga → data yang belum terkirim hilang (RAM-only). Ini acceptable baseline, jika tidak acceptable → revisi Part 2 buffer ke flash (HTL-03/04 binding).
✔ 10.3 LAN Down / WiFi Disconnect
Detection (Gateway)
DRV_WIFI.connected()==false
Impact
- MQTT tidak bisa connect
- Buffer bertambah
Recovery
- WiFi reconnect tiap 5s
- Jika buffer penuh: drop oldest (bounded)
✔ 10.4 Buffer Full (Gateway)
Detection
SRV_BUF.size() == SRV_BUF.capacity()
Impact
- Data telemetry/health tertua ter-drop
Recovery
- Drop-oldest policy berjalan otomatis
buf_droppeddipublish di gateway health (monitoring)
Mandatory alarm threshold (server side)
- Jika
buf_droppedbertambah → severity Warning
✔ 10.5 DB Write Failure (SQLite locked / disk full)
Detection (Server)
- sqlite3 exception pada insert
- OS disk usage > 95%
Impact
- Data telemetry/health tidak tersimpan
- Namun MQTT tetap jalan (data akan hilang jika tidak ada buffering server)
Recovery (Server)
- Untuk baseline: log error + continue (tidak crash)
- Jika disk full: stop ingestion + raise alarm topic
htl/<site>/health/server(akan dibahas di DEV-HTL-05/monitoring), atau minimal print & exit untuk di-restart systemd (Part 4 optional)
Operational action
- Rotate/clean DB / pindahkan ke SSD / extend storage
11. Security Implementation
Binding: HTL-07.
✔ 11.1 MQTT Authentication (Mandatory)
allow_anonymous falseuser terpisah:
htl_gw(gateway)htl_srv(server services)
Gateway credential storage
- Baseline: hardcoded (lab) NOT production
- Production: simpan di NVS (DEV-HTL-01/02 style) atau compile-time secret per-site.
✔ 11.2 MQTT ACL (Mandatory)
ACL file yang sudah dikunci di Part 3 wajib diterapkan.
Rule utama
- Gateway hanya boleh publish telemetry/health/ack
- Gateway hanya boleh subscribe command/config/ota
- Server boleh read/write semua topic di
htl/#
✔ 11.3 TLS Policy (LAN Optional, Phase Gate)
Baseline (phase awal): non-TLS di LAN. Jika LAN dianggap untrusted → wajib TLS.
Jika TLS diaktifkan:
- Mosquitto listener TLS port 8883
- Gateway gunakan client TLS (butuh cert store; overhead RAM)
Keputusan phase awal: TLS optional (HTL-07 already).
✔ 11.4 Replay / Duplicate (MQTT)
MQTT replay mitigasi:
- Telemetry punya
seqdi payload (coming from DEV-HTL-02) - Server ingestion tidak perlu dedup (baseline), tapi wajib simpan
payload_jsonsehingga analisis bisa lakukan dedup offline.
Jika butuh dedup server: implement last_seq per node di SQLite (future patch).
12. Testing Hook (HTL-09 Reference)
Semua test ini harus bisa dijalankan di lab tanpa cloud.
✔ 12.1 Broker Stop/Start Test
Steps
- Start gateway + server ingestion
- Stop mosquitto 60s
- Start mosquitto kembali
Expected
- Gateway buffer depth naik saat down
- Setelah broker up → buffer turun ke 0
- Tidak ada crash
Pass
buf_dropped= 0 untuk down-time 60s dengan telemetry 2s dan 15 node jika kapasitas buffer cukup- Catatan: 15 node @2s → 7.5 msg/s → 60s = 450 msg. Buffer 50 tidak cukup. Jadi test ini wajib memakai throttle / buffer flash / atau accept drop.
- Untuk baseline RAM 50: telemetry interval harus dinaikkan saat broker down (future improvement), atau buffer cap ditingkatkan jika RAM cukup.
✔ 12.2 Buffer Overflow Test
Setup
- set telemetry fast (0.5s)
- matikan broker 2 menit
Expected
buf_droppednaik- gateway tetap hidup
- reconnect sukses
✔ 12.3 End-to-End Command Test
Steps
- Publish command via
cli_publish_command.py - Gateway harus menerima dan forward ke DEV-HTL-02 boundary
- Publish ACK ke server topic
Expected
- entry
command_logdibuat - ack diterima server (akan ditambahkan subscriber ack di Part 2 jika dibutuhkan; baseline minimal: log)
✔ 12.4 DB Integrity Check
Steps
- Insert 100 telemetry
- restart ingestion service
- verify count in sqlite
Expected
- data persist
- WAL mode aktif
✔ 12.5 LAN Down Test
Steps
- putus WiFi gateway 2 menit
- restore
Expected
- gateway reconnect
- no watchdog reset
- buffer drop policy berjalan
13. Definition of Done
DEV-HTL-03 dianggap selesai jika:
- Gateway publish telemetry/health ke broker sesuai topic map (Part 3)
- Server ingestion menulis ke SQLite dengan schema.sql
- Gateway subscribe command/config/ota (Part 3) dan callback masuk ke handler (DEV-HTL-02 boundary)
- Store-and-forward buffer berjalan dan bounded (drop-oldest)
- Reconnect strategy non-blocking berjalan (WiFi + MQTT)
- Mosquitto config + ACL + passwd diterapkan
- Test HTL-09 kategori integration: broker stop/start, LAN down, DB write, command flow minimal lulus
- Tidak ada crash pada soak 24 jam (baseline)
14. Revision History
| Version | Date | Author | Description |
|---|---|---|---|
| v0.1.0 | 2026-02-25 | HTL | Initial DEV-HTL-03 baseline: MQTT bridge, SQLite ingestion, contract lock, failure/security/testing |
Catatan Penyusunan Artikel ini disusun sebagai materi edukasi dan referensi umum berdasarkan berbagai sumber pustaka, praktik lapangan, serta bantuan alat penulisan. Pembaca disarankan untuk melakukan verifikasi lanjutan dan penyesuaian sesuai dengan kondisi serta kebutuhan masing-masing sistem.