File tree Expand file tree Collapse file tree 9 files changed +174
-6
lines changed
java/com/cevher/ms/person/service
java/com/cevher/ms/salary Expand file tree Collapse file tree 9 files changed +174
-6
lines changed Original file line number Diff line number Diff line change @@ -34,6 +34,16 @@ services:
3434We will use github public repository for our configuration:
3535https://github.com/cevheri/microservices-config-server
3636
37+ ## Kafka Configuration
38+
39+ ---
40+ ### Create Kafka Topic
41+ ` ` ` shell
42+ kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 5 --topic salary
43+ ```
44+ ---
45+
46+
3747
3848---
3949## Development
Original file line number Diff line number Diff line change 8383 <artifactId >springfox-swagger-ui</artifactId >
8484 <version >3.0.0</version >
8585 </dependency >
86-
86+ <dependency >
87+ <groupId >org.springframework.kafka</groupId >
88+ <artifactId >spring-kafka</artifactId >
89+ </dependency >
90+ <dependency >
91+ <groupId >org.springframework.kafka</groupId >
92+ <artifactId >spring-kafka-test</artifactId >
93+ <scope >test</scope >
94+ </dependency >
8795 </dependencies >
8896 <dependencyManagement >
8997 <dependencies >
Original file line number Diff line number Diff line change @@ -20,10 +20,28 @@ public class PersonService {
2020
2121 private final PersonRepository personRepository ;
2222 private final RestTemplate restTemplate ;
23+ private final SalaryProducer salaryProducer ;
24+
25+ private final Double PERSON_DEFAULT_SALARY = 1000D ;
26+
27+ private void sendFirstSalary (Person person ) {
28+ log .info ("sendFirstSalary method of PersonService for CurrentPerson" );
29+
30+ SalaryMessage salaryMessage = SalaryMessage
31+ .builder ()
32+ .personId (person .getId ())
33+ .amount (PERSON_DEFAULT_SALARY )
34+ .build ();
35+ salaryProducer .produce (salaryMessage .toString ());
36+
37+ log .info ("Send Kafka Message: " + salaryMessage );
38+ }
2339
2440 public Person savePerson (Person person ) {
2541 log .info ("savePerson method of PersonService" );
26- return personRepository .save (person );
42+ Person resultPerson = personRepository .save (person );
43+ sendFirstSalary (resultPerson );
44+ return resultPerson ;
2745 }
2846
2947 public ResponseTempVM getPersonWithDepartment (Long personId ) {
Original file line number Diff line number Diff line change 1+ package com .cevher .ms .person .service ;
2+
3+ import lombok .*;
4+ import lombok .extern .slf4j .Slf4j ;
5+ import org .springframework .kafka .core .KafkaTemplate ;
6+ import org .springframework .stereotype .Service ;
7+
8+ import java .io .Serializable ;
9+ import java .util .UUID ;
10+
11+ @ Service
12+ @ Slf4j
13+ @ RequiredArgsConstructor
14+ public class SalaryProducer {
15+ private static final String KAFKA_TOPIC = "salary" ;
16+
17+ private final KafkaTemplate <String , String > kafkaTemplate ;
18+
19+ public void produce (String message ) {
20+ kafkaTemplate .send (KAFKA_TOPIC , message );
21+ }
22+ }
23+
24+
25+ @ AllArgsConstructor
26+ @ NoArgsConstructor
27+ @ ToString
28+ @ EqualsAndHashCode
29+ @ Builder ()
30+ class SalaryMessage implements Serializable {
31+
32+ private String toService ;
33+ private String fromService ;
34+ private String uuid ;
35+
36+ @ Getter
37+ @ Setter
38+ private Long personId ;
39+
40+ @ Getter
41+ @ Setter
42+ private Double amount ;
43+
44+ public String getUuid () {
45+ return UUID .randomUUID ().toString ();
46+ }
47+
48+ public String getFromService () {
49+ return "person-service" ;
50+ }
51+ public String getToService () {
52+ return "salary-service" ;
53+ }
54+ }
Original file line number Diff line number Diff line change 33 name : person-service
44 zipkin :
55 base-url : ${ZIPKIN_URI:http://zipkin:9411/}
6-
6+ kafka :
7+ consumer :
8+ bootstrap-servers : localhost:9092
9+ group-id : salary_group
10+ auto-offset-reset : earliest
11+ key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
12+ value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
13+ producer :
14+ bootstrap-servers : localhost:9092
15+ key-serializer : org.apache.kafka.common.serialization.StringSerializer
16+ value-serializer : org.apache.kafka.common.serialization.StringSerializer
717server :
818 port : 9002
919
@@ -28,4 +38,4 @@ eureka:
2838# inform the Eureka service that client wants to be advertised by IP address.
2939# Personally, we always set this attribute to true. Cloud-based microservices are sup-
3040# posed to be ephemeral and stateless. They can be started up and shut down at will.
31- # IP addresses are more appropriate for these types of services.
41+ # IP addresses are more appropriate for these types of services.
Original file line number Diff line number Diff line change 1414@ ToString
1515@ EqualsAndHashCode
1616@ Entity
17- @ Table (name = "salary" )
17+ @ Table (name = "salary" , catalog = "public" )
1818public class Salary implements Serializable {
1919
2020 @ Id
Original file line number Diff line number Diff line change 1+ package com .cevher .ms .salary .service ;
2+
3+ import lombok .*;
4+ import lombok .extern .slf4j .Slf4j ;
5+ import org .springframework .kafka .annotation .KafkaListener ;
6+ import org .springframework .stereotype .Service ;
7+
8+ import java .io .Serializable ;
9+ import java .util .UUID ;
10+
11+ @ Service
12+ @ Slf4j
13+ @ RequiredArgsConstructor
14+ public class SalaryConsumer {
15+
16+ private static final String KAFKA_TOPIC = "salary" ;
17+ private static final String KAFKA_CONSUMER_GROUP = "salary_group" ;
18+
19+ @ KafkaListener (topics = KAFKA_TOPIC ,
20+ groupId = KAFKA_CONSUMER_GROUP )
21+ public void consume (String message ) {
22+ log .info ("Consumer Group : " + message .toString ());
23+ }
24+ }
25+
26+ @ AllArgsConstructor
27+ @ NoArgsConstructor
28+ @ ToString
29+ @ EqualsAndHashCode
30+ @ Builder ()
31+ class SalaryMessage implements Serializable {
32+
33+ private String toService ;
34+ private String fromService ;
35+ private String uuid ;
36+
37+ @ Getter
38+ @ Setter
39+ private Long personId ;
40+
41+ @ Getter
42+ @ Setter
43+ private Double amount ;
44+
45+ public String getUuid () {
46+ return UUID .randomUUID ().toString ();
47+ }
48+
49+ public String getFromService () {
50+ return "person-service" ;
51+ }
52+
53+ public String getToService () {
54+ return "salary-service" ;
55+ }
56+ }
Original file line number Diff line number Diff line change 1- package com .cevher .ms .salary .rest ;
1+ package com .cevher .ms .salary .web . rest ;
22
33import com .cevher .ms .salary .dto .SalaryDto ;
44import com .cevher .ms .salary .service .SalaryService ;
Original file line number Diff line number Diff line change 33 name : salary-service
44 zipkin :
55 base-url : ${ZIPKIN_URI:http://zipkin:9411/}
6+ kafka :
7+ consumer :
8+ bootstrap-servers : localhost:9092
9+ group-id : salary_group
10+ auto-offset-reset : earliest
11+ key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
12+ value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
13+ producer :
14+ bootstrap-servers : localhost:9092
15+ key-serializer : org.apache.kafka.common.serialization.StringSerializer
16+ value-serializer : org.apache.kafka.common.serialization.StringSerializer
17+
618server :
719 port : 9004
820
You can’t perform that action at this time.
0 commit comments