Recipe: Streaming the FHIR Audit from the IBM FHIR Server with Go

The IBM FHIR Server supports audit events for FHIR interactions (CREATE-READ-UPDATE-DELETE-SEARCH-EXTENDED_OPERATION) in Cloud Auditing Data Federation (CADF) and HL7 FHIR AuditEvent and pushing the events to an Apache Kafka backend. You can read more about it in another post I made.

This recipe shows how to stream the data with Go-Kafka in a small lightweight library and decode the BASE64 content embedded in CADF.

Let’s spin up an IBM FHIR Server with fhir-audit and how to stream and decode the important content.

Recipe

  1. Log in to the IBM Cloud Console
  2. Click Create Resource
Click Create Resource (on right)

3. Search for Event Streams, and click on Event Streams

Search and Click on Event Streams

4. Choose your Location. I chose us-south. Pick the data center that is closest or co-located with your IBM FHIR Server.

5. Select the Standard plan. A typical bundle which represents a patient history, such as Antonia30_Acosta403.json, include hundreds of resources which correspond to many messages sent over the topic when processed as a Bundle batch. For a Bundle transaction, you get only one notification message.

6. Click I Accept

7. Click Create. You are redirected to the EventStreams resource that is created.

Your Service is Created

8. Click Service Credentials

9. Click New Credentials

10. Click Add

Create Credentials Dialog

11. Copy the Service Credentials and paste it locally (on the right hand-side)

Copy Credentials

12. Click on Topics

13. Click Create Topic

14. Enter Topic Name – FHIR_AUDIT

15. Click Next

Enter Topic Name

16. Select the default number of partitions

Select Number of Partitions

17. Click Next

18. Select Message Retention – 1 Day

19. Click Create Topic

Create Topic

With the IBM EventStreams Kafka Topic setup, it’s now time to connect the IBM FHIR Server and the FHIR Audit module to Kafka.

20. Download the fhir-server-config.json

curl -L https://raw.githubusercontent.com/IBM/FHIR/main/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config-audit-environment.json -o fhir-server-config-audit-environment.json

The file contains a setting for loading the properties from the Environment (e.g. Operation System variables) and some basic settings:

        "audit": {
            "serviceClassName" : "com.ibm.fhir.audit.impl.KafkaService",
            "serviceProperties" : {
                "auditTopic": "FHIR_AUDIT",
                "geoCity": "Dallas",
                "geoState": "TX",
                "geoCounty": "US"
            }
        }

21. Download the docker image

docker pull ibmcom/ibm-fhir-server:latest

22. Convert the copied Service Credentials into a file – ibm-creds.json

23. Make the JSON a single line

cat ibm-creds.json | tr -d '\n'

The output is:

{  "api_key": "credentialapikey",  "apikey": "credentialapikey",  "iam_apikey_description": "Auto-generated for key 01ba3165-85d9-410b-ad1a-1111",  "iam_apikey_name": "Service credentials-1",  "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Manager",  "iam_serviceid_crn": "crn:v1:bluemix:public:iam-identity::a/11111::serviceid:ServiceId-864b549f-ff90-4b24-84f2-1111",  "instance_id": "6b00cc8c-26ec-4c42-a1e8-f4da5b9f71e7",  "kafka_admin_url": "https://admin.eventstreams.cloud.ibm.com",  "kafka_brokers_sasl": [    "broker-3.eventstreams.cloud.ibm.com:9093",    "broker-5.eventstreams.cloud.ibm.com:9093",    "broker-1.eventstreams.cloud.ibm.com:9093",    "broker-2.eventstreams.cloud.ibm.com:9093",    "broker-4.eventstreams.cloud.ibm.com:9093",    "broker-0.eventstreams.cloud.ibm.com:9093"  ],  "kafka_http_url": "https://admin.eventstreams.cloud.ibm.com",  "password": "credentialapikey",  "user": "token"}

24. Export ES_CONFIG

export ES_CONFIG='{  "api_key": "credentialapikey",  "apikey": "credentialapikey",  "iam_apikey_description": "Auto-generated for key 01ba3165-85d9-410b-ad1a-1111",  "iam_apikey_name": "Service credentials-1",  "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Manager",  "iam_serviceid_crn": "crn:v1:bluemix:public:iam-identity::a/11111::serviceid:ServiceId-864b549f-ff90-4b24-84f2-1111",  "instance_id": "6b00cc8c-26ec-4c42-a1e8-f4da5b9f71e7",  "kafka_admin_url": "https://admin.eventstreams.cloud.ibm.com",  "kafka_brokers_sasl": [    "broker-3.eventstreams.cloud.ibm.com:9093",    "broker-5.eventstreams.cloud.ibm.com:9093",    "broker-1.eventstreams.cloud.ibm.com:9093",    "broker-2.eventstreams.cloud.ibm.com:9093",    "broker-4.eventstreams.cloud.ibm.com:9093",    "broker-0.eventstreams.cloud.ibm.com:9093"  ],  "kafka_http_url": "https://admin.eventstreams.cloud.ibm.com",  "password": "credentialapikey",  "user": "token"}'

Note the single quotes surround the above.

25. Start up the IBM FHIR Server with the EventStreams credentials and fhir-server-config.json we just downloaded.

docker run --rm -d -p 9443:9443 -e BOOTSTRAP_DB=true \
    -v $(pwd)/fhir-server-config-audit-environment.json:/config/config/default/fhir-server-config.json \
    -e EVENT_STREAMS_AUDIT_BINDING="${ES_CONFIG}" ibmcom/ibm-fhir-server

You see the container id output 60a5f1cae6d677d80772f1736db1be74836a8a4845fcccc81286b7c557bc2d86.

26. Check that the applications are started using the container id.

$ docker logs 60a | grep -i started
[4/21/21, 20:55:58:449 UTC] 00000001 FrameworkMana I   CWWKE0002I: The kernel started after 1.43 seconds
[4/21/21, 20:55:58:464 UTC] 0000002a FeatureManage I   CWWKF0007I: Feature update started.
[4/21/21, 20:56:01:328 UTC] 00000030 AppMessageHel A   CWWKZ0001I: Application fhir-openapi started in 1.588 seconds.
[4/21/21, 20:56:03:141 UTC] 00000031 AppMessageHel A   CWWKZ0001I: Application fhir-bulkdata-webapp started in 3.402 seconds.
[4/21/21, 20:56:07:824 UTC] 0000002d AppMessageHel A   CWWKZ0001I: Application fhir-server-webapp started in 7.871 seconds.
[4/21/21, 20:56:07:868 UTC] 0000002a TCPPort       I   CWWKO0219I: TCP Channel defaultHttpEndpoint-ssl has been started and is now listening for requests on host *  (IPv4) port 9443.
[4/21/21, 20:56:07:880 UTC] 0000002a FeatureManage A   CWWKF0011I: The defaultServer server is ready to run a smarter planet. The defaultServer server started in 10.885 seconds.

27. Download the Sample Data

curl -L https://raw.githubusercontent.com/IBM/FHIR/main/fhir-server-test/src/test/resources/testdata/everything-operation/Antonia30_Acosta403.json -o Antonia30_Acosta403.json

28. Load the Sample Data bundle to the IBM FHIR Server

curl -k --location --request POST 'https://localhost:9443/fhir-server/api/v4' \
--header 'Content-Type: application/fhir+json' \
-u "fhiruser:change-password" \
--data-binary  "@Antonia30_Acosta403.json" -o response.json

29. Scan the response.json for any status that is not "status": "201". For example, the status is in the family of User Request Error or Server Side Error.

30. Clone the repository fhir-kafka-go

31. Clone the repository

git clone https://github.com/prb112/fhir-kafka-go.git

32. Build the Go guild

go build

33. Run the fhir-kakfka-go library

./fhir-kafka-go $(cat ibm-creds.json | jq -r '.kafka_brokers_sasl | join(",")') "token" $(cat ibm-creds.json | jq -r '.password') FHIR_AUDIT

You’ll see the following output:

2022/02/18 12:53:02 The audit details are:  t${
    "request_unique_id": "8b7ebb8f-3b48-4633-92d3-4e1d423a3aa2",
    "action": "C",
    "start_time": "2022-02-18 17:52:22.444",
    "end_time": "2022-02-18 17:52:22.525",
    "api_parameters": {
        "request": "/fhir-server/api/v4/Patient",
        "request_status": 201
    },
    "data": {
        "resource_type": "Patient",
        "id": "17f0df6a5b6-1d8db4d7-5042-4742-a951-6c3a3d08ee34",
        "version_id": "1"
    },
    "event_type": "fhir-create",
    "description": "FHIR Create request",
    "location": "172.17.0.1/_gateway"
}

This tool uses Go-Kafka and a few extra tools to process Kafka Messages and help tail the messages.

I hope this works for you, it’s great to work with Go and Kafka.


by

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.