Hi Diaz!
Yes, I remember our call about this. This is how I would start:
- Data from Wazuh are collected through reading logs in
/var/ossec/logs/alerts/alerts.json
. You can see that in "Wazuh" pipeline of logstash. (To see your pipelines, check /etc/logstash/pipelines.yml
)
So, here is example entry from Wazuh, from alerts.json
file:
{"timestamp":"2023-09-26T11:16:55.973+0200","rule":{"level":3,"description":"Successful sudo executed.","id":"5407","mitre":{"id":["T1548.003"],"tactic":["Privilege Escalation","Defense Evasion"],"technique":["Sudo and Sudo Caching"]},"firedtimes":53,"mail":false,"groups":["syslog","sudo"],"pci_dss":["10.2.5","10.2.2"],"gpg13":["7.6","7.8","7.13"],"gdpr":["IV_32.2"],"tsc":["CC6.8","CC7.2","CC7.3"]},"agent":{"id":"012","name":"MyHost","ip":"x.x.x.x","labels":{"group":"LINUX_GROUPS","location":"waw"}},"manager":{"name":"energylogserver730"},"id":"1645979815.13235459","cluster":{"name":"wazuh","node":"energylogserver730"},"full_log":"Sep 26 11:16:55 example.els.pl sudo: padre : TTY=unknown ; PWD=/ ; USER=monitor ; COMMAND=/usr/bin/mon check orphans host","predecoder":{"program_name":"sudo","timestamp":"Sep 26 11:16:55","hostname":"example.els.pl"},"decoder":{"parent":"sudo","name":"sudo","ftscomment":"First time user executed the sudo command"},"data":{"srcuser":"padre","dstuser":"monitor","tty":"unknown","pwd":"/","command":"/usr/bin/mon check orphans host"},"location":"/var/log/secure"}
We look for this: "id":"012","name":"MyHost","ip":"x.x.x.x"
. Here is the agent id, name and ip. We can use any of that information in parser. Note that in future this can be even more flexible, as you can use different policies for different customers, and so on.
- Now we move to the parser conditioning
/etc/logstash/conf.d/wazuh/01-wazuh.conf
. In the input
section we see how data is collected. We will add if
statement to the next section - filter
. This will allow us to add new field, and use it in the final output
section.
filter {
if [agent][name] == "MyHost" {
mutate {
add_field => [ "customer_id", "123" ]
}
}
}
About above
If possible, it's general good idea to avoid using specific names in order to increase data security in case of any breaches. That's why my suggestion is to give each customer unique id and use it during parsing.
Filter just added new field, called customer_id
and assigned it a value of 123
.
- We use newly created field and add output with variable in index name. We move to the
output
section:
output {
logserver {
hosts => ["http://127.0.0.1:9200"]
ssl => false
index => "wazuh-alerts-%{[customer_id]}-%{+YYYY.MM}"
template => "/etc/logstash/templates.d/wazuh-template.json"
user => "parser"
password => "parser"
}
}
This would result in creating index called wazuh-alerts-123-2023.09
, if it would be done today.
- We can move to the UI,
Config
module and create roles that would have access to this index, based on customer id, which is 123
.
After that we can assign newly created role to user for that customer and that's about it.
Any visualizations, that would be based on index patter wazuh-alerts*
would also try to visualize data from that customer, but flexibility allows us to limit data access for that specific role, to see data only from wazuh-alerts-123-...
, so there is no need to create new objects for each customer.
Hope that helps, let me know if you'll run in some troubles 💪