Spring Integration Using DSL - Folder Monitor or Watch Program

Spring Integration:
Spring integration is used to enable lightweight messaging system.
It supports integration with multiple external system using adapters.

DSL - It is helpful to spring integration to configure some builders and fluent API's.
IntegrationFlow - It is used to hanlde Multiple different adapter control.
Files.inboundAdapter - It is used to build a file reading message source.
filter - It is used to check some filter or condition.
Poller.fixedDelay - It is used to do the periodic trigger
transform - It is a mechanism to convert message payloads from one form to another form between channels.
handle - Used to do some operation with another class.

Dependencies:
<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <!-- Spring Integration - Java DSL -->
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-java-dsl</artifactId>
      <version>1.0.0.RELEASE</version>

    </dependency>


Need to add below beans into the main spring boot class
@Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(DIRECTORY)).
                    filter(new LastModifiedFileFilter()),
            c -> c.poller(Pollers.fixedDelay(1000))).
    transform(fileToStringTransformer()).
            handle("fileProcessor", "process").
            get();
}
@Bean
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}

@Bean
public FileProcessor fileProcessor() {
return new FileProcessor();
}

Below class is the processor class:
public class FileProcessor {
    private static final String HEADER_FILE_NAME = "file_name";
    private static final String MSG = "%s received. Content: %s";

    public void process(Message<String> msg) {
        String fileName = (String) msg.getHeaders().get(HEADER_FILE_NAME);
        String content = msg.getPayload();

        System.out.println(String.format(MSG, fileName, content));
    }
}


Below class is used to check the LastModified file is process or not ?
public class LastModifiedFileFilter extends AbstractFileListFilter<File> {
    private final Map<String, Long> files = new HashMap<>();
    private final Object monitor = new Object();

    @Override
    protected boolean accept(File file) {
        synchronized (this.monitor) {
            Long previousModifiedTime = files.put(file.getName(), file.lastModified());

            return previousModifiedTime == null || previousModifiedTime != file.lastModified();
        }
    }

Comments

Popular posts from this blog

How to set Java Object into JaxBElement ?

GitLab