001package ball.upnp.ssdp;
002/*-
003 * ##########################################################################
004 * UPnP/SSDP Implementation Classes
005 * $Id: SSDPDiscoveryThread.java 5285 2020-02-05 04:23:21Z ball $
006 * $HeadURL: svn+ssh://svn.hcf.dev/var/spool/scm/repository.svn/ball-upnp/trunk/src/main/java/ball/upnp/ssdp/SSDPDiscoveryThread.java $
007 * %%
008 * Copyright (C) 2013 - 2020 Allen D. Ball
009 * %%
010 * Licensed under the Apache License, Version 2.0 (the "License");
011 * you may not use this file except in compliance with the License.
012 * You may obtain a copy of the License at
013 *
014 *      http://www.apache.org/licenses/LICENSE-2.0
015 *
016 * Unless required by applicable law or agreed to in writing, software
017 * distributed under the License is distributed on an "AS IS" BASIS,
018 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
019 * See the License for the specific language governing permissions and
020 * limitations under the License.
021 * ##########################################################################
022 */
023import java.io.IOException;
024import java.net.DatagramPacket;
025import java.net.DatagramSocket;
026import java.net.SocketException;
027import java.net.SocketTimeoutException;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.CopyOnWriteArrayList;
030import org.apache.http.ParseException;
031
032/**
033 * SSDP discovery {@link Thread} implementation.
034 *
035 * {@bean.info}
036 *
037 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball}
038 * @version $Revision: 5285 $
039 */
040public class SSDPDiscoveryThread extends Thread {
041    protected final int interval;
042    private final DatagramSocket socket;
043    private final CopyOnWriteArrayList<Listener> list =
044        new CopyOnWriteArrayList<>();
045    private final ConcurrentLinkedQueue<SSDPMessage> queue =
046        new ConcurrentLinkedQueue<>();
047    private final SSDPDiscoveryRequest request = new SSDPDiscoveryRequest();
048
049    /**
050     * Sole constructor.
051     *
052     * @param   interval        The minimum interval (in seconds) between
053     *                          broadcast messages.
054     *
055     * @throws  SocketException
056     *                          If the underlying {@link DatagramSocket}
057     *                          cannot be conditioned.
058     */
059    public SSDPDiscoveryThread(int interval) throws SocketException {
060        super();
061
062        if (interval > 0) {
063            this.interval = interval;
064        } else {
065            throw new IllegalArgumentException("interval=" + interval);
066        }
067
068        setDaemon(true);
069        setName(getClass().getName());
070
071        socket = new DatagramSocket();
072        socket.setBroadcast(true);
073        socket.setReuseAddress(true);
074        socket.setSoTimeout((interval * 1000) / 2);
075    }
076
077    /**
078     * Method to add a {@link Listener}.
079     *
080     * @param  listener         The {@link Listener}.
081     */
082    public void addListener(Listener listener) { list.add(listener); }
083
084    /**
085     * Method to remove a {@link Listener}.
086     *
087     * @param  listener         The {@link Listener}.
088     */
089    public void removeListener(Listener listener) { list.remove(listener); }
090
091    /**
092     * Method to queue a {@link SSDPMessage} for transmission.
093     *
094     * @param  message          The {@link SSDPMessage}.
095     */
096    public void queue(SSDPMessage message) {
097        if (message != null && (! queue.contains(message))) {
098            queue.add(message);
099        }
100    }
101
102    /**
103     * Callback periodically made to {@link #queue(SSDPMessage)} a
104     * {@link SSDPDiscoveryRequest} available to be intercepted (overridden)
105     * by subclass implementations.
106     */
107    protected void ping() { queue(request); }
108
109    @Override
110    public void start() {
111        new Thread() {
112            { setDaemon(true); }
113
114            @Override
115            public void run() {
116                for (;;) {
117                    ping();
118
119                    try {
120                        sleep(interval * 1000);
121                    } catch (InterruptedException exception) {
122                    }
123                }
124            }
125        }.start();
126
127        super.start();
128    }
129
130    @Override
131    public void run() {
132        try {
133            byte[] bytes = new byte[8 * 1024];
134            DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
135
136            for (;;) {
137                SSDPMessage message = null;
138
139                while ((message = queue.poll()) != null) {
140                    for (Listener listener : list) {
141                        listener.sendEvent(this, message);
142                    }
143
144                    socket.send(message.toDatagramPacket());
145                }
146
147                try {
148                    packet.setData(bytes);
149                    socket.receive(packet);
150
151                    message = parse(packet);
152
153                    if (message != null) {
154                        for (Listener listener : list) {
155                            listener.receiveEvent(this, message);
156                        }
157                    }
158                } catch (SocketTimeoutException exception) {
159                }
160            }
161        } catch (IOException exception) {
162        } finally {
163            if (socket != null) {
164                socket.close();
165            }
166        }
167    }
168
169    private SSDPMessage parse(DatagramPacket packet) {
170        SSDPMessage message = null;
171
172        if (message == null) {
173            try {
174                message = new SSDPResponse(packet);
175            } catch (ParseException exception) {
176            }
177        }
178
179        if (message == null) {
180            try {
181                message = new SSDPRequest(packet);
182            } catch (ParseException exception) {
183            }
184        }
185
186        return message;
187    }
188
189    /**
190     * {@link SSDPDiscoveryThread} listener interface definition.
191     */
192    public interface Listener {
193
194        /**
195         * Callback made just before sending a {@link SSDPMessage}.
196         *
197         * @param       thread          The {@link SSDPDiscoveryThread}.
198         * @param       message         The {@link SSDPMessage}.
199         */
200        public void sendEvent(SSDPDiscoveryThread thread, SSDPMessage message);
201
202        /**
203         * Callback made after receiving a {@link SSDPMessage}.
204         *
205         * @param       thread          The {@link SSDPDiscoveryThread}.
206         * @param       message         The {@link SSDPMessage}.
207         */
208        public void receiveEvent(SSDPDiscoveryThread thread,
209                                 SSDPMessage message);
210    }
211}