1) Project Service:发布事件(Producer / Publish)
1.1 Event DTO(Data Transfer Object)
// ProjectApprovedEvent.java
public class ProjectApprovedEvent {
private String eventId;
private String eventType; // "PROJECT_APPROVED"
private String projectId;
private String occurredAt; // ISO time
private String vendorId;
private long budgetCents;
private String startDate; // "YYYY-MM-DD"
private String endDate;
public ProjectApprovedEvent() {}
// getters/setters 省略(实际可用 Lombok @Data)
}
1.2 Producer:把事件 publish 到 topic
// ProjectEventProducer.java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.UUID;
@Service
public class ProjectEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "project-events";
public ProjectEventProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishProjectApproved(String projectId, String vendorId,
long budgetCents, String startDate, String endDate) {
String eventId = UUID.randomUUID().toString();
String occurredAt = Instant.now().toString();
// 简化:手写 JSON(真实项目建议用 ObjectMapper)
String json = "{"
+ "\"eventId\":\"" + eventId + "\","
+ "\"eventType\":\"PROJECT_APPROVED\","
+ "\"projectId\":\"" + projectId + "\","
+ "\"occurredAt\":\"" + occurredAt + "\","
+ "\"vendorId\":\"" + vendorId + "\","
+ "\"budgetCents\":" + budgetCents + ","
+ "\"startDate\":\"" + startDate + "\","
+ "\"endDate\":\"" + endDate + "\""
+ "}";
// key 用 projectId:保证同一 project 的事件尽量按顺序进同一个 partition
kafkaTemplate.send(TOPIC, projectId, json);
}
}
2) Contract Service:订阅并消费(Consumer / Subscribe)
2.1 Listener:订阅 topic 并处理事件
// ContractEventConsumer.java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class ContractEventConsumer {
private final ContractRepository contractRepository;
public ContractEventConsumer(ContractRepository contractRepository) {
this.contractRepository = contractRepository;
}
// ✅ 这行就是“订阅”:Contract Service subscribe to "project-events"
@KafkaListener(topics = "project-events", groupId = "contract-service")
public void onMessage(String messageJson) {
String eventType = JsonUtil.getString(messageJson, "eventType");
if (!"PROJECT_APPROVED".equals(eventType)) {
return;
}
String projectId = JsonUtil.getString(messageJson, "projectId");
// ✅ 简化版幂等:同一个 projectId 只允许一份合同
if (contractRepository.existsByProjectId(projectId)) {
return;
}
// 创建合同:contractId 由 Contract Service 生成
Contract c = new Contract();
c.setContractId(UUID.randomUUID().toString());
c.setProjectId(projectId);
c.setVendorId(JsonUtil.getString(messageJson, "vendorId"));
c.setBudgetCents(JsonUtil.getLong(messageJson, "budgetCents"));
c.setStartDate(JsonUtil.getString(messageJson, "startDate"));
c.setEndDate(JsonUtil.getString(messageJson, "endDate"));
c.setStatus("DRAFT");
contractRepository.save(c);
}
}