1) Project Service:发布事件(Producer / Publish)

1.1 Event DTO(Data Transfer Object)

// ProjectApprovedEvent.java
public class ProjectApprovedEvent {
    private String eventId;
    private String eventType;   // "PROJECT_APPROVED"
    private String projectId;
    private String occurredAt;  // ISO time
    private String vendorId;
    private long budgetCents;
    private String startDate;   // "YYYY-MM-DD"
    private String endDate;
 
    public ProjectApprovedEvent() {}
 
    // getters/setters 省略(实际可用 Lombok @Data)
}
 

1.2 Producer:把事件 publish 到 topic

// ProjectEventProducer.java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
import java.time.Instant;
import java.util.UUID;
 
@Service
public class ProjectEventProducer {
 
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC = "project-events";
 
    public ProjectEventProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
 
    public void publishProjectApproved(String projectId, String vendorId,
                                       long budgetCents, String startDate, String endDate) {
 
        String eventId = UUID.randomUUID().toString();
        String occurredAt = Instant.now().toString();
 
        // 简化:手写 JSON(真实项目建议用 ObjectMapper)
        String json = "{"
                + "\"eventId\":\"" + eventId + "\","
                + "\"eventType\":\"PROJECT_APPROVED\","
                + "\"projectId\":\"" + projectId + "\","
                + "\"occurredAt\":\"" + occurredAt + "\","
                + "\"vendorId\":\"" + vendorId + "\","
                + "\"budgetCents\":" + budgetCents + ","
                + "\"startDate\":\"" + startDate + "\","
                + "\"endDate\":\"" + endDate + "\""
                + "}";
 
        // key 用 projectId:保证同一 project 的事件尽量按顺序进同一个 partition
        kafkaTemplate.send(TOPIC, projectId, json);
    }
}
 

2) Contract Service:订阅并消费(Consumer / Subscribe)

2.1 Listener:订阅 topic 并处理事件

// ContractEventConsumer.java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
import java.util.UUID;
 
@Service
public class ContractEventConsumer {
 
    private final ContractRepository contractRepository;
 
    public ContractEventConsumer(ContractRepository contractRepository) {
        this.contractRepository = contractRepository;
    }
 
    // ✅ 这行就是“订阅”:Contract Service subscribe to "project-events"
    @KafkaListener(topics = "project-events", groupId = "contract-service")
    public void onMessage(String messageJson) {
 
        String eventType = JsonUtil.getString(messageJson, "eventType");
        if (!"PROJECT_APPROVED".equals(eventType)) {
            return;
        }
 
        String projectId = JsonUtil.getString(messageJson, "projectId");
 
        // ✅ 简化版幂等:同一个 projectId 只允许一份合同
        if (contractRepository.existsByProjectId(projectId)) {
            return;
        }
 
        // 创建合同:contractId 由 Contract Service 生成
        Contract c = new Contract();
        c.setContractId(UUID.randomUUID().toString());
        c.setProjectId(projectId);
        c.setVendorId(JsonUtil.getString(messageJson, "vendorId"));
        c.setBudgetCents(JsonUtil.getLong(messageJson, "budgetCents"));
        c.setStartDate(JsonUtil.getString(messageJson, "startDate"));
        c.setEndDate(JsonUtil.getString(messageJson, "endDate"));
        c.setStatus("DRAFT");
 
        contractRepository.save(c);
    }
}