001// -------------------------------------------------------------------------------- 002// Copyright 2002-2024 Echo Three, LLC 003// 004// Licensed under the Apache License, Version 2.0 (the "License"); 005// you may not use this file except in compliance with the License. 006// You may obtain a copy of the License at 007// 008// http://www.apache.org/licenses/LICENSE-2.0 009// 010// Unless required by applicable law or agreed to in writing, software 011// distributed under the License is distributed on an "AS IS" BASIS, 012// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013// See the License for the specific language governing permissions and 014// limitations under the License. 015// -------------------------------------------------------------------------------- 016 017package com.echothree.util.server.kafka; 018 019import com.echothree.model.control.core.server.control.CoreControl; 020import com.echothree.model.data.core.server.entity.Event; 021import com.echothree.util.server.persistence.Session; 022import com.echothree.util.server.string.EntityInstanceUtils; 023import com.google.common.base.Charsets; 024import com.google.common.net.MediaType; 025import com.google.gson.Gson; 026import com.google.gson.GsonBuilder; 027import fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory; 028import org.apache.http.HttpHeaders; 029import org.apache.kafka.clients.producer.ProducerRecord; 030import org.apache.kafka.common.header.Headers; 031import org.apache.kafka.common.header.internals.RecordHeader; 032import org.apache.kafka.common.header.internals.RecordHeaders; 033 034public class EventTopic { 035 036 private static final String TOPIC = "echothree-events-json"; 037 038 private static final EventTopic instance = new EventTopic(); 039 040 public static EventTopic getInstance() { 041 return instance; 042 } 043 044 KafkaConnectionFactory kafkaConnectionFactory = KafkaConnectionFactoryResource.getInstance().getKafkaConnectionFactory(); 045 EntityInstanceUtils entityInstanceUtils = EntityInstanceUtils.getInstance(); 046 Gson gson = new GsonBuilder().serializeNulls().create(); 047 048 Headers HEADERS_JSON = new RecordHeaders() 049 .add(new RecordHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString().getBytes(Charsets.UTF_8))); 050 051 protected EventTopic() {} 052 053 public void sendEvent(Event event) { 054 if(kafkaConnectionFactory != null) { 055 try { 056 try(var kafkaConnection = kafkaConnectionFactory.createConnection()) { 057 var coreControl = Session.getModelController(CoreControl.class); 058 var eventId = event.getPrimaryKey().getEntityId(); 059 var eventTime = event.getEventTime(); 060 var eventTimeSequence = event.getEventTimeSequence(); 061 var entityInstance = coreControl.ensureUlidForEntityInstance(event.getEntityInstance(), false); 062 var entityRef = entityInstanceUtils.getEntityRefByEntityInstance(entityInstance); 063 var id = entityInstance.getUlid(); 064 var eventTypeName = event.getEventType().getEventTypeName(); 065 var relatedEntityRef = entityInstanceUtils.getEntityRefByEntityInstance(event.getRelatedEntityInstance()); 066 var relatedEventType = event.getRelatedEventType(); 067 var relatedEventTypeName = relatedEventType == null ? null : relatedEventType.getEventTypeName(); 068 var createdByEntityRef = entityInstanceUtils.getEntityRefByEntityInstance(event.getCreatedBy()); 069 070// var value = "eventId = " + eventId 071// + ", eventTime = " + eventTime 072// + ", eventTimeSequence = " + eventTimeSequence 073// + ", entityRef = " + entityRef 074// + ", id = " + id 075// + ", eventType = " + eventTypeName 076// + ", relatedEntityInstance = " + relatedEntityRef 077// + ", relatedEventType = " + relatedEventTypeName 078// + ", createdByEntityInstance = " + createdByEntityRef; 079 080// var eventValue = com.echothree.model.avro.core.common.Event.newBuilder() 081// .setEventId(eventId) 082// .setEventTime(eventTime) 083// .setEventTimeSequence(eventTimeSequence) 084// .setEntityRef(entityRef) 085// .setId(id) 086// .setEventTypeName(eventTypeName) 087// .setRelatedEntityRef(relatedEntityRef) 088// .setRelatedEventTypeName(relatedEventTypeName) 089// .setCreatedByEntityRef(createdByEntityRef) 090// .build(); 091 092 var eventJsonObject = new com.echothree.model.control.core.server.kafka.Event(eventId, eventTime, 093 eventTimeSequence, entityRef, id, eventTypeName, relatedEntityRef, relatedEventTypeName, 094 createdByEntityRef); 095 var eventJson = gson.toJson(eventJsonObject); 096 097 var future = kafkaConnection.send(new ProducerRecord<>(TOPIC, null, 098 eventTime, entityRef, eventJson, HEADERS_JSON)); 099 100 future.get(); 101 } 102 } catch(Exception e) { 103 throw new RuntimeException(e); 104 } 105 } 106 } 107 108}