Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions docs/plugins/core-engines-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# EventMesh Core Engines Configuration Guide

EventMesh provides powerful core engines (`Filter`, `Transformer`, `Router`) to dynamically process messages. These engines are configured via **MetaStorage** (Governance Center, e.g., Nacos, Etcd), supporting on-demand loading and hot-reloading.

## 0. Core Concepts

Before configuration, it is important to understand the specific role of each engine in the message flow:

* **Filter (The Gatekeeper)**: Decides **"Whether to pass"**.
* It inspects the message (CloudEvent) attributes. If the message matches the rules, it passes; otherwise, it is dropped.
* *Use Case*: Block debug logs from production traffic; Only subscribe to specific event types.

* **Transformer (The Translator)**: Decides **"What it looks like"**.
* It modifies the message content (Payload or Metadata) according to templates or scripts.
* *Use Case*: Convert XML to JSON; Mask sensitive data (PII); Adapt legacy protocols to new standards.

* **Router (The Dispatcher)**: Decides **"Where to go"**.
* It dynamically changes the destination (Topic) of the message.
* *Use Case*: Route traffic to a Canary/Gray release topic; Route high-priority orders to a dedicated queue.

---

## 1. Overview

The configuration is not in local property files but distributed via the MetaStorage. EventMesh listens to specific **Keys** based on client Groups.

- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`.
- **Loading Mechanism**: Lazy loading & Hot-reloading.
- **Key Format**: `{EnginePrefix}-{GroupName}`.
- **Value Format**: JSON Array.

| Engine | Prefix | Scope | Description |
| :--- | :--- | :--- | :--- |
| **Router** | `router-` | Pub Only | Routes messages to different topics. |
| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. |
| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). |

---

## 2. Router (Routing)

**Scope**: Publish Only (Upstream)
**Key**: `router-{producerGroup}`

Decides the target storage topic for a message sent by a producer.

### Configuration Example (JSON)

```json
[
{
"topic": "original-topic",
"routerConfig": {
"targetTopic": "redirect-topic",
"expression": "data.type == 'urgent'"
}
}
]
```

* **topic**: The original topic the producer sends to.
* **targetTopic**: The actual topic to write to Storage.
* **expression**: Condition to trigger routing (e.g., SpEL).

---

## 3. Filter (Filtering)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `filter-{producerGroup}`
**Effect**: Intercepts messages **before** they are sent to Storage.

### B. Subscribe Side (Downstream)
**Key**: `filter-{consumerGroup}`
**Effect**: Intercepts messages **before** they are pushed to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "test-topic",
"filterPattern": {
"source": ["app-a", "app-b"],
"type": [{"prefix": "com.example"}]
}
}
]
```

* **filterPattern**: Rules matching CloudEvent attributes. If a message doesn't match, it is dropped.

---

## 4. Transformer (Transformation)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `transformer-{producerGroup}`
**Effect**: Modifies message content **before** sending to Storage.

### B. Subscribe Side (Downstream)
**Key**: `transformer-{consumerGroup}`
**Effect**: Modifies message content **before** pushing to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "raw-topic",
"transformerConfig": {
"transformerType": "template",
"template": "{\"id\": \"${id}\", \"new_content\": \"${data.content}\"}"
}
}
]
```

* **transformerType**: e.g., `original`, `template`.
* **template**: The transformation template definition.

---

## 5. Verification

1. **Publish Config**: Add the JSON config to your Governance Center (e.g., Nacos) with the Data ID `router-MyGroup`.
2. **Send Message**: Use EventMesh SDK to send a message from `MyGroup`.
3. **Observe**:
* For **Router**: Check if the message appears in the `targetTopic` in your MQ.
* For **Filter**: Check if blocked messages are skipped.
* For **Transformer**: Check if the message body in MQ (for Pub) or Consumer (for Sub) is modified.
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,31 @@
import java.io.InputStream;
import java.io.OutputStream;

public class Base64Utils {
private static final int CACHE_SIZE = 1024;
import lombok.experimental.UtilityClass;

public Base64Utils() {
}
@UtilityClass
public class Base64Utils {
private final int CACHE_SIZE = 1024;

public static byte[] decode(String base64) throws Exception {
public byte[] decode(String base64) throws Exception {
return Base64.decode(base64.toCharArray());
}

public static String encode(byte[] bytes) throws Exception {
public String encode(byte[] bytes) throws Exception {
return new String(Base64.encode(bytes));
}

public static String encodeFile(String filePath) throws Exception {
public String encodeFile(String filePath) throws Exception {
byte[] bytes = fileToByte(filePath);
return encode(bytes);
}

public static void decodeToFile(String filePath, String base64) throws Exception {
public void decodeToFile(String filePath, String base64) throws Exception {
byte[] bytes = decode(base64);
byteArrayToFile(bytes, filePath);
}

public static byte[] fileToByte(String filePath) throws Exception {
public byte[] fileToByte(String filePath) throws Exception {
byte[] data = new byte[0];
File file = new File(filePath);
if (file.exists()) {
Expand All @@ -71,7 +71,7 @@ public static byte[] fileToByte(String filePath) throws Exception {
return data;
}

public static void byteArrayToFile(byte[] bytes, String filePath) throws Exception {
public void byteArrayToFile(byte[] bytes, String filePath) throws Exception {
InputStream in = new ByteArrayInputStream(bytes);
File destFile = new File(filePath);
if (!destFile.getParentFile().exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import java.io.FileReader;
import java.io.IOException;

import lombok.experimental.UtilityClass;

@UtilityClass
public class EncryptUtil {
public EncryptUtil() {
}

private static byte[] hexStringToBytes(String hexString) {
private byte[] hexStringToBytes(String hexString) {
if (hexString != null && !hexString.equals("")) {
hexString = hexString.toUpperCase();
int length = hexString.length() / 2;
Expand All @@ -44,7 +45,7 @@ private static byte[] hexStringToBytes(String hexString) {
}
}

public static String byteToHexString(byte[] b) {
public String byteToHexString(byte[] b) {
String a = "";

for (int i = 0; i < b.length; ++i) {
Expand All @@ -59,11 +60,11 @@ public static String byteToHexString(byte[] b) {
return a;
}

private static byte charToByte(char c) {
private byte charToByte(char c) {
return (byte) "0123456789ABCDEF".indexOf(c);
}

private static String readFileContent(String filePath) {
private String readFileContent(String filePath) {
File file = new File(filePath);
BufferedReader reader = null;
StringBuffer key = new StringBuffer();
Expand Down Expand Up @@ -100,7 +101,7 @@ private static String readFileContent(String filePath) {
return key.toString();
}

public static String decrypt(String sysPubKeyFile, String appPrivKeyFile, String encStr) throws Exception {
public String decrypt(String sysPubKeyFile, String appPrivKeyFile, String encStr) throws Exception {
String pubKeyBase64 = readFileContent(sysPubKeyFile);
String privKeyBase64 = readFileContent(appPrivKeyFile);
byte[] encBin = hexStringToBytes(encStr);
Expand All @@ -109,7 +110,7 @@ public static String decrypt(String sysPubKeyFile, String appPrivKeyFile, String
return new String(privDecBin);
}

public static String decrypt(ParamType pubKeyType, String sysPubKey, ParamType privKeyType, String appPrivKey, ParamType passwdType,
public String decrypt(ParamType pubKeyType, String sysPubKey, ParamType privKeyType, String appPrivKey, ParamType passwdType,
String passwd) throws Exception {
String pubKeyBase64 = pubKeyType == ParamType.FILE ? readFileContent(sysPubKey) : sysPubKey;
String privKeyBase64 = privKeyType == ParamType.FILE ? readFileContent(appPrivKey) : appPrivKey;
Expand All @@ -120,15 +121,15 @@ public static String decrypt(ParamType pubKeyType, String sysPubKey, ParamType p
return new String(privDecBin);
}

public static String encrypt(String appPubKeyFile, String sysPrivKeyFile, String passwd) throws Exception {
public String encrypt(String appPubKeyFile, String sysPrivKeyFile, String passwd) throws Exception {
String pubKeyBase64 = readFileContent(appPubKeyFile);
String privKeyBase64 = readFileContent(sysPrivKeyFile);
byte[] pubEncBin = RSAUtils.encryptByPublicKeyBlock(passwd.getBytes(), pubKeyBase64);
byte[] privEncBin = RSAUtils.encryptByPrivateKeyBlock(pubEncBin, privKeyBase64);
return byteToHexString(privEncBin);
}

public static String encrypt(ParamType pubKeyType, String appPubKey, ParamType privKeyType, String sysPrivKey, String passwd) throws Exception {
public String encrypt(ParamType pubKeyType, String appPubKey, ParamType privKeyType, String sysPrivKey, String passwd) throws Exception {
String pubKeyBase64 = pubKeyType == ParamType.FILE ? readFileContent(appPubKey) : appPubKey;
String privKeyBase64 = privKeyType == ParamType.FILE ? readFileContent(sysPrivKey) : sysPrivKey;
byte[] pubEncBin = RSAUtils.encryptByPublicKeyBlock(passwd.getBytes(), pubKeyBase64);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import com.alibaba.druid.pool.DruidDataSource;

import lombok.experimental.UtilityClass;

@UtilityClass
public class JdbcUtils {

public static DruidDataSource createDruidDataSource(String url, String userName, String passWord) {
public DruidDataSource createDruidDataSource(String url, String userName, String passWord) {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(userName);
Expand Down
Loading