amqpfind – Multi Server
Usage
In multi server mode, amqpfind makes connections to a multiple RabbitMQ servers. Each connection starts a separate process and listens for messages for the topics specified. Satellite Data Services ingests data from multiple sources. Each of these ingest process may publish identical messages. To avoid receiving duplicate messages, amqpfind uses the argument -k in combination with -w, identifies the duplicate messages and only sends the most recent message to the user.
amqpfind -H -u -p [-t] [-X] [-k] [-w] [-j'{key} {key}’ | ‘{?}’] [-C key | key]
-H | host ip address of the Satellite Data Services RabbitMQ server mq1(2).ssec.wisc.edu | |||
-u | user for SDS RabbitMQ server (sdsuser) | |||
-p | password for SDS RabbitMQ server (sdsmq) | |||
-j | outputs subset of json key values of the message; must be a single quoted string, each key must be surrounded by curly braces; for example: -j ‘{start_time} {band} {status}’ | |||
-j ‘?’ | outputs user friendly lists all keys and values from message | |||
-k | keys checked to see is a message, sent from multiple servers, is considered a duplicate | |||
-w | time window, in second, used to watch for duplicate messages | |||
-t | sets timeout in seconds before amqpfind ends | |||
-X | SDS RabbitMQ server exchange; valid options are satellite or himawari | |||
-C | period-separated list of nine subscription keys; specific values or wild card must be specified for all nine keys | |||
-v | debug level |
Examples
In the examples that follow, a 60 second window is opened to check for duplicate messages from the servers mq1.ssec.wisc.edu and mq2.ssec.wisc.edu. If a second message is received within 60 seconds that matches the start_time, band and coverage, it would be considered a duplicate and not processed by amqpfind.
amqpfind -H mq1.ssec.wisc.edu -u sdsuser -p sdsmq \
-H mq2.ssec.wisc.edu -u sdsuser -p sdsmq \
-X satellite \
-w 60.0 \
-k ‘(start_time,band,coverage)’ \
-C geo.goes.g16.abi.*.*.*.band.end
Output
geo.goes.g16.abi.adde.sdi.ncdf.band.end: ‘{“message_type”: “band”, “status”: “end”, “satellite_family”: “GOES”, “satellite_ID”: “G16”, “medium”: “adde”, “start_time”: “2018-02-02 16:20:34.6”, “coverage”: “Mesoscale-1”, “satellite_location”: “GOES-East”, “create_time”: “2018-02-02 16:20:44.4”, “data_type”: “RadM1”, “instrument”: “ABI”, “end_time”: “2018-02-02 16:20:40.3”, “signal_type”: “grb”, “server_ip”: “satbuf1.ssec.wisc.edu”, “mode”: “3”, “path”: “/data/goes/grb/goes16/2018/2018_02_02_033/abi/L1b/RadM1/OR_ABI-L1b-RadM1-M3C14_G16_s20180331620346_e20180331620403_c20180331620444.nc”, “adde_dataset”: “EASTA/M1”, “title”: “ABI L1b Radiances”, “server_type”: “sdi”, “band”: 14}’
amqpfind -H mq1.ssec.wisc.edu -u sdsuser -p sdsmq \
-H mq2.ssec.wisc.edu -u sdsuser -p sdsmq \
-k ‘(start_time,band,coverage)’ \
-w 60.0 \
-X satellite \
-j ‘{adde_dataset} {start_time} {band}’ \
-C geo.goes.abi.*.*.*.band.end
Output
EASTA/M1 2018-01-23 21:54:29.8 8
Scripting
One method to implement amqpfind is to write a wrapper script. The script can set an environment and start amqpfind. One advantage of using this method is that your processing script can be updated without having to stop and restart the wrapper script.
Wrapper Script code
#! /bin/bash
#
# Script that starts up bash script based on amqpfind messages
#
# Note – when using Full Disk make sure -n is set to 7. Otherwise, ‘Disk’ will be passed to the next call to the script.
export COVERAGE=CONUS
export PATH=$HOME/amqpfind:$PATH
while true; do
amqpfind -H mq1.ssec.wisc.edu -p sdsmq -u sdsuser \
-H mq2.ssec.wisc.edu -p sdsmq -u sdsuser \
-k ‘(start_time, coverage)’ \
-w 60.0 \
-X satellite \
-j ‘{start_time} {adde_dataset} {server_ip} {satellite_location} {coverage}’ \
-C geo.goes.*.abi.*.*.*.image.complete | grep –line-buffered -i ${COVERAGE} | xargs -P1 -n6 $PWD/remap.bash
echo $(date -u) amqpfind died in $0
done