askill
data-pipeline-engineering

data-pipeline-engineeringSafety 85Repository

Data pipeline design, ETL processes, Spring Integration patterns, batch processing for political data

206 stars
4.1k downloads
Updated 2/22/2026

Package Files

Loading files...
SKILL.md

Data Pipeline Engineering Skill

Purpose

Design and implement robust data pipelines for the CIA platform that extract, transform, and load Swedish political data from multiple sources into the internal data model. Covers Spring Integration, batch processing, and monitoring patterns.

When to Use

  • ✅ Building data import pipelines for Riksdagen data
  • ✅ Designing ETL workflows for political data aggregation
  • ✅ Implementing batch processing for large datasets
  • ✅ Creating data refresh and synchronization jobs
  • ✅ Monitoring pipeline health and data quality

Do NOT use for:

  • ❌ Real-time API request handling (use api-integration skill)
  • ❌ UI data binding (use vaadin-component-design skill)

Pipeline Architecture

┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
│  Extract   │───▶│ Transform  │───▶│   Load     │───▶│  Monitor   │
│            │    │            │    │            │    │            │
│ API Fetch  │    │ Validate   │    │ JPA Upsert │    │ Metrics    │
│ XML/JSON   │    │ Map Fields │    │ Batch Save │    │ Alerts     │
│ Pagination │    │ Enrich     │    │ Index      │    │ Dashboards │
└────────────┘    └────────────┘    └────────────┘    └────────────┘

Spring Integration Patterns

Message-Driven Pipeline

@Configuration
public class RiksdagPipelineConfig {

    @Bean
    public IntegrationFlow riksdagImportFlow() {
        return IntegrationFlow
            .from(pollingSource(), e -> e.poller(
                Pollers.cron("0 0 2 * * *")     // Daily at 2 AM
                    .maxMessagesPerPoll(1)
                    .errorHandler(pipelineErrorHandler())))
            .channel("riksdagRawChannel")
            .transform(xmlToJsonTransformer())
            .split(personListSplitter())
            .channel(c -> c.executor(taskExecutor()))
            .filter(dataQualityFilter())
            .transform(entityMapper())
            .aggregate(batchAggregator())
            .handle(jpaOutboundAdapter())
            .get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("riksdag-pipeline-");
        return executor;
    }
}

Error Channel Configuration

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlow
        .from("errorChannel")
        .handle(message -> {
            MessagingException exception = (MessagingException) message.getPayload();
            LOG.error("Pipeline error: {}", exception.getMessage(), exception);
            metricsService.incrementErrorCount("riksdag-pipeline");

            // Route to dead letter queue for manual review
            Message<?> failedMessage = exception.getFailedMessage();
            deadLetterRepository.save(new DeadLetterEntry(
                failedMessage.getPayload().toString(),
                exception.getMessage(),
                Instant.now()
            ));
        })
        .get();
}

Batch Processing

Spring Batch Job Configuration

@Configuration
public class VoteImportBatchConfig {

    @Bean
    public Job voteImportJob(JobRepository jobRepository, Step importStep) {
        return new JobBuilder("voteImportJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .start(importStep)
            .build();
    }

    @Bean
    public Step importStep(JobRepository jobRepository,
                           PlatformTransactionManager txManager) {
        return new StepBuilder("importVotes", jobRepository)
            .<RiksdagVote, VoteData>chunk(100, txManager)
            .reader(voteReader())
            .processor(voteProcessor())
            .writer(voteWriter())
            .faultTolerant()
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .skipLimit(10)
            .skip(DataValidationException.class)
            .listener(stepListener())
            .build();
    }
}

Chunk Size Guidelines

Data TypeRecords/BatchChunk SizeReason
Person data~35050Small dataset, frequent updates
Vote records~100K/session500Large dataset, bulk insert
Documents~50K100Variable size, careful processing
Committee data~10025Small, relational integrity

Data Transformation Patterns

Field Mapping

@Component
public class RiksdagEntityMapper {

    public PersonData mapPerson(RiksdagPerson source) {
        PersonData target = new PersonData();
        target.setId(source.getIntressentId());
        target.setFirstName(sanitize(source.getFornamn()));
        target.setLastName(sanitize(source.getEfternamn()));
        target.setParty(normalizeParty(source.getParti()));
        target.setBornYear(parseYear(source.getFoddAr()));
        target.setGender(normalizeGender(source.getKon()));
        target.setStatus(source.getStatus());
        target.setImportTimestamp(Instant.now());
        return target;
    }

    private String sanitize(String input) {
        if (input == null) return null;
        String trimmed = input.trim().replaceAll("[\\p{Cntrl}]", "");  // Remove control characters
        return trimmed.substring(0, Math.min(trimmed.length(), 255));
    }

    private String normalizeParty(String party) {
        return Optional.ofNullable(party)
            .map(String::trim)
            .map(String::toUpperCase)
            .orElse("-");
    }
}

Scheduling and Orchestration

@Component
public class PipelineScheduler {

    @Scheduled(cron = "0 0 2 * * *")   // Daily full refresh
    public void dailyFullImport() {
        LOG.info("Starting daily full import");
        importService.importAll();
    }

    @Scheduled(cron = "0 */15 8-18 * * MON-FRI")  // Every 15 min during sessions
    public void incrementalVoteImport() {
        if (riksdagSessionActive()) {
            LOG.info("Starting incremental vote import");
            importService.importRecentVotes();
        }
    }
}

Monitoring and Observability

Pipeline Metrics

@Component
public class PipelineMetrics {

    private final MeterRegistry meterRegistry;

    public void recordImport(String pipeline, int recordCount, long durationMs) {
        meterRegistry.counter("pipeline.records.imported",
            "pipeline", pipeline).increment(recordCount);
        meterRegistry.timer("pipeline.duration",
            "pipeline", pipeline).record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordError(String pipeline, String errorType) {
        meterRegistry.counter("pipeline.errors",
            "pipeline", pipeline,
            "error_type", errorType).increment();
    }
}

Security Considerations

  • Sanitize all imported data — apply input validation before persistence
  • Use transactions — ensure atomicity for batch operations
  • Limit concurrency — prevent resource exhaustion with bounded thread pools
  • Secure credentials — externalize API keys, use Spring Vault or env vars
  • Audit data imports — log source, count, and timestamp for traceability

ISMS Alignment

ControlRequirement
ISO 27001 A.8.10Information deletion / data retention
ISO 27001 A.5.33Protection of records
NIST CSF PR.DS-1Data-at-rest protection
CIS Control 3Data protection

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

82/100Analyzed 2/23/2026

Comprehensive skill covering Spring Integration and Spring Batch patterns for data pipeline engineering. Well-structured with clear when-to-use guidance, code examples, and ISMS alignment. Slightly project-specific due to Riksdagen data focus but patterns are broadly reusable. Good safety considerations with error handling and monitoring.

85
85
75
85
90

Metadata

Licenseunknown
Version-
Updated2/22/2026
PublisherHack23

Tags

apici-cdobservabilitysecurity