编程语言
首页 > 编程语言> > java-从Esper套接字开始

java-从Esper套接字开始

作者:互联网

我是Esper的新手,我希望获得一些帮助.我已经设法将Esper与CSV文件配合使用,但是现在我需要将Java对象用作通过套接字发送的事件,并且我在Internet上找不到简单的示例作为指南.

有没有一些简单的例子可以作为依据?

无论如何,我在这里让我想使之起作用的代码.当我运行它时没有任何反应,似乎套接字连接不起作用.

服务器类(它还包含事件类).应该发送事件:

import java.io.* ;
import java.net.* ;

class Server {

static final int PORT=5002;

    public Server( ) {
        try {
            ServerSocket skServer = new ServerSocket( PORT );
            System.out.println("Listening at " + PORT );
            Socket skClient = skServer.accept();        
            System.out.println("Serving to Esper");
            OutputStream aux = skClient.getOutputStream();
            ObjectOutputStream flux = new ObjectOutputStream( aux );
            int i = 0;
            while (i<10) {
                flux.writeObject( new MeteoEvent(i,"A") );
                i++;
                }
            flux.flush();
            skClient.close();
            System.out.println("End of transmission");
            } catch( Exception e ) {
            System.out.println( e.getMessage() );
        }
    }

    public static void main( String[] arg ) {
        new Server();
    }

    class MeteoEvent{

        private int sensorId;
        private String GeoArea;

        public MeteoEvent() {
        }

        public MeteoEvent(int sensorid, String geoarea) {
            this.sensorId = sensorid;
            this.GeoArea = geoarea;
        }

        public int getSensorId() {
            return sensorId;
        }

        public void setSensorId(int sensorId) {
            this.sensorId = sensorId;
        }

        public String getGeoArea() {
            return GeoArea;
        }

        public void setGeoArea(String geoArea) {
            GeoArea = geoArea;
        }   
    }
}

和基于Esper的类.

import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.event.map.MapEventBean;
import com.espertech.esperio.socket.EsperIOSocketAdapter;
import com.espertech.esperio.socket.config.ConfigurationSocketAdapter;
import com.espertech.esperio.socket.config.DataType;
import com.espertech.esperio.socket.config.SocketConfig;

public class Demo {

    public static class CEPListener implements UpdateListener {

        private String tag;
        public CEPListener (String tag)
        {
            this.tag = tag;
        }

public static void main(String[] args) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();

        Map<String, Object> eventProperties = new HashMap<String, Object>();
        eventProperties.put("sensorId", int.class);
        eventProperties.put("GeoArea", String.class);
        configuration.addEventType("MeteoEvent", eventProperties);

        ConfigurationSocketAdapter socketAdapterConfig = new ConfigurationSocketAdapter();

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setDataType(DataType.OBJECT);
        socketConfig.setPort(5002);
        socketAdapterConfig.getSockets().put("MeteoSocket", socketConfig);

        EPServiceProvider cepService = EPServiceProviderManager.getProvider("MeteoSocket",configuration);

        EPRuntime cepServiceRT = cepService.getEPRuntime();

        EPAdministrator cepAdmin = cepService.getEPAdministrator();

        EsperIOSocketAdapter socketAdapter = new EsperIOSocketAdapter (socketAdapterConfig, "MeteoSocket");
        socketAdapter.start();

        EPStatement stmt = cepAdmin.createEPL("insert into JoinStream select * from MeteoEvent");

        EPStatement outputStatementX = cepAdmin.createEPL("select * from JoinStream");

        outputStatementX.addListener(new CEPListener("JS"));

        cepService.initialize();

        Object lock = new Object();
        synchronized (lock)
        {
                lock.wait();
         }
}

如果有人需要一些时间来帮助我,请非常感谢.

解决方法:

问题解决了! Esper开发人员清单非常有用.我通过位于here的测试类学习了如何使用Esper套接字

最好的祝福!

标签:sockets,esper,java,complex-event-processing
来源: https://codeday.me/bug/20191031/1978699.html