Yes - Logstash Aggregate Filter
The recommended approach is using Logstash aggregate filter with elasticsearch input.
This solution:
- Reads from multiple indices simultaneously
- Correlates events in-memory by common key (username)
- Creates merged documents
- Writes to dedicated correlation index
Working Configuration
Create file: /etc/logstash/conf.d/correlation/jira-windows.conf
input {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "jira-*,windows-*"
query => '{"query": {"range": {"@timestamp": {"gte": "now-1h"}}}}'
schedule => "*/5 * * * *"
user => "${LOGSTASH_USER}"
password => "${LOGSTASH_PASS}"
}
}
filter {
# Identify source
if [task_id] {
mutate { add_field => { "source_type" => "jira" } }
} else if [event_id] {
mutate { add_field => { "source_type" => "windows" } }
}
# Correlate by username
aggregate {
task_id => "%{username}"
code => "
map['username'] ||= event.get('username')
if event.get('source_type') == 'jira'
map['jira_task'] = event.get('task_id')
map['jira_timestamp'] = event.get('@timestamp')
end
if event.get('source_type') == 'windows'
map['windows_host'] = event.get('host')
map['windows_timestamp'] = event.get('@timestamp')
end
# Create merged document when both sources exist
if map['jira_task'] && map['windows_host']
event.set('correlated', true)
event.set('jira_task_id', map['jira_task'])
event.set('jira_timestamp', map['jira_timestamp'])
event.set('windows_hostname', map['windows_host'])
event.set('windows_timestamp', map['windows_timestamp'])
end
"
timeout => 1800 # 30 minutes
}
if [correlated] != true {
drop {}
}
}
output {
logserver {
hosts => ["127.0.0.1:9200"]
index => "correlation-jira-windows-%{+YYYY.MM.dd}"
user => "${LOGSTASH_USER}"
password => "${LOGSTASH_PASS}"
}
}
Key Parameters
schedule - Data polling frequency - Recommended: */5 * * * * (every 5 min)
timeout - Correlation window - Recommended: 1800 seconds (30 min)
task_id - Grouping key - Recommended: username, session_id, or source IP
query - Source filter - Recommended: Last 1 hour
Deployment Steps
1. Validate syntax
/usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/correlation/jira-windows.conf
2. Add to pipelines.yml
Add this to /etc/logstash/pipelines.yml:
- pipeline.id: jira-windows-correlation
path.config: "/etc/logstash/conf.d/correlation/jira-windows.conf"
pipeline.workers: 1
3. Restart Logstash
systemctl restart logstash
4. Verify results
curl -X GET "localhost:9200/correlation-jira-windows-*/_search?pretty&size=1"
Performance Considerations
Memory Usage
- Aggregate filter stores correlation map in RAM
- Monitor heap usage with many unique users
- Adjust JVM settings if needed
Performance Tuning
- Adjust
timeout to actual time gap between events
- Increase
pipeline.workers for high volume scenarios
- Use
query filters to limit processed data
"Configuration has been syntax-validated. For production deployment, test with your actual data volumes and adjust timeout and pipeline.workers based on your specific requirements."