001// -------------------------------------------------------------------------------- 002// Copyright 2002-2026 Echo Three, LLC 003// 004// Licensed under the Apache License, Version 2.0 (the "License"); 005// you may not use this file except in compliance with the License. 006// You may obtain a copy of the License at 007// 008// http://www.apache.org/licenses/LICENSE-2.0 009// 010// Unless required by applicable law or agreed to in writing, software 011// distributed under the License is distributed on an "AS IS" BASIS, 012// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013// See the License for the specific language governing permissions and 014// limitations under the License. 015// -------------------------------------------------------------------------------- 016 017package com.echothree.util.server.kafka; 018 019import com.echothree.model.control.core.server.control.EntityInstanceControl; 020import com.echothree.model.data.core.server.entity.Event; 021import com.echothree.util.server.persistence.Session; 022import com.echothree.util.server.string.EntityInstanceUtils; 023import com.google.common.net.MediaType; 024import com.google.gson.Gson; 025import com.google.gson.GsonBuilder; 026import fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory; 027import java.nio.charset.StandardCharsets; 028import javax.annotation.Resource; 029import javax.enterprise.context.ApplicationScoped; 030import org.apache.http.HttpHeaders; 031import org.apache.kafka.clients.producer.ProducerRecord; 032import org.apache.kafka.common.header.Headers; 033import org.apache.kafka.common.header.internals.RecordHeader; 034import org.apache.kafka.common.header.internals.RecordHeaders; 035 036@ApplicationScoped 037public class EventTopic { 038 039 @Resource(name = "java:/KafkaConnectionFactory") 040 KafkaConnectionFactory kafkaConnectionFactory; 041 042 private static final String TOPIC = "echothree-events-json"; 043 044 private final Gson gson = new GsonBuilder().serializeNulls().create(); 045 046 private final Headers HEADERS_JSON = new RecordHeaders() 047 .add(new RecordHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString().getBytes(StandardCharsets.UTF_8))); 048 049 protected EventTopic() {} 050 051 public void sendEvent(Event event) { 052 if(kafkaConnectionFactory != null) { 053 try { 054 try(var kafkaConnection = kafkaConnectionFactory.createConnection()) { 055 var entityInstanceControl = Session.getModelController(EntityInstanceControl.class); 056 var eventId = event.getPrimaryKey().getEntityId(); 057 var eventTime = event.getEventTime(); 058 var eventTimeSequence = event.getEventTimeSequence(); 059 var entityInstance = entityInstanceControl.ensureUuidForEntityInstance(event.getEntityInstance(), false); 060 var entityRef = EntityInstanceUtils.getEntityRefByEntityInstance(entityInstance); 061 var id = entityInstance.getUuid(); 062 var eventTypeName = event.getEventType().getEventTypeName(); 063 var relatedEntityRef = EntityInstanceUtils.getEntityRefByEntityInstance(event.getRelatedEntityInstance()); 064 var relatedEventType = event.getRelatedEventType(); 065 var relatedEventTypeName = relatedEventType == null ? null : relatedEventType.getEventTypeName(); 066 var createdByEntityRef = EntityInstanceUtils.getEntityRefByEntityInstance(event.getCreatedBy()); 067 068// var value = "eventId = " + eventId 069// + ", eventTime = " + eventTime 070// + ", eventTimeSequence = " + eventTimeSequence 071// + ", entityRef = " + entityRef 072// + ", id = " + id 073// + ", eventType = " + eventTypeName 074// + ", relatedEntityInstance = " + relatedEntityRef 075// + ", relatedEventType = " + relatedEventTypeName 076// + ", createdByEntityInstance = " + createdByEntityRef; 077 078// var eventValue = com.echothree.model.avro.core.common.Event.newBuilder() 079// .setEventId(eventId) 080// .setEventTime(eventTime) 081// .setEventTimeSequence(eventTimeSequence) 082// .setEntityRef(entityRef) 083// .setId(id) 084// .setEventTypeName(eventTypeName) 085// .setRelatedEntityRef(relatedEntityRef) 086// .setRelatedEventTypeName(relatedEventTypeName) 087// .setCreatedByEntityRef(createdByEntityRef) 088// .build(); 089 090 var eventJsonObject = new com.echothree.model.control.core.server.kafka.Event(eventId, eventTime, 091 eventTimeSequence, entityRef, id, eventTypeName, relatedEntityRef, relatedEventTypeName, 092 createdByEntityRef); 093 var eventJson = gson.toJson(eventJsonObject); 094 095 var future = kafkaConnection.send(new ProducerRecord<>(TOPIC, null, 096 eventTime, entityRef, eventJson, HEADERS_JSON)); 097 098 future.get(); 099 } 100 } catch(Exception e) { 101 throw new RuntimeException(e); 102 } 103 } 104 } 105 106}