001package ball.upnp.ssdp; 002/*- 003 * ########################################################################## 004 * UPnP/SSDP Implementation Classes 005 * $Id: SSDPDiscoveryThread.java 5285 2020-02-05 04:23:21Z ball $ 006 * $HeadURL: svn+ssh://svn.hcf.dev/var/spool/scm/repository.svn/ball-upnp/trunk/src/main/java/ball/upnp/ssdp/SSDPDiscoveryThread.java $ 007 * %% 008 * Copyright (C) 2013 - 2020 Allen D. Ball 009 * %% 010 * Licensed under the Apache License, Version 2.0 (the "License"); 011 * you may not use this file except in compliance with the License. 012 * You may obtain a copy of the License at 013 * 014 * http://www.apache.org/licenses/LICENSE-2.0 015 * 016 * Unless required by applicable law or agreed to in writing, software 017 * distributed under the License is distributed on an "AS IS" BASIS, 018 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 019 * See the License for the specific language governing permissions and 020 * limitations under the License. 021 * ########################################################################## 022 */ 023import java.io.IOException; 024import java.net.DatagramPacket; 025import java.net.DatagramSocket; 026import java.net.SocketException; 027import java.net.SocketTimeoutException; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.CopyOnWriteArrayList; 030import org.apache.http.ParseException; 031 032/** 033 * SSDP discovery {@link Thread} implementation. 034 * 035 * {@bean.info} 036 * 037 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball} 038 * @version $Revision: 5285 $ 039 */ 040public class SSDPDiscoveryThread extends Thread { 041 protected final int interval; 042 private final DatagramSocket socket; 043 private final CopyOnWriteArrayList<Listener> list = 044 new CopyOnWriteArrayList<>(); 045 private final ConcurrentLinkedQueue<SSDPMessage> queue = 046 new ConcurrentLinkedQueue<>(); 047 private final SSDPDiscoveryRequest request = new SSDPDiscoveryRequest(); 048 049 /** 050 * Sole constructor. 051 * 052 * @param interval The minimum interval (in seconds) between 053 * broadcast messages. 054 * 055 * @throws SocketException 056 * If the underlying {@link DatagramSocket} 057 * cannot be conditioned. 058 */ 059 public SSDPDiscoveryThread(int interval) throws SocketException { 060 super(); 061 062 if (interval > 0) { 063 this.interval = interval; 064 } else { 065 throw new IllegalArgumentException("interval=" + interval); 066 } 067 068 setDaemon(true); 069 setName(getClass().getName()); 070 071 socket = new DatagramSocket(); 072 socket.setBroadcast(true); 073 socket.setReuseAddress(true); 074 socket.setSoTimeout((interval * 1000) / 2); 075 } 076 077 /** 078 * Method to add a {@link Listener}. 079 * 080 * @param listener The {@link Listener}. 081 */ 082 public void addListener(Listener listener) { list.add(listener); } 083 084 /** 085 * Method to remove a {@link Listener}. 086 * 087 * @param listener The {@link Listener}. 088 */ 089 public void removeListener(Listener listener) { list.remove(listener); } 090 091 /** 092 * Method to queue a {@link SSDPMessage} for transmission. 093 * 094 * @param message The {@link SSDPMessage}. 095 */ 096 public void queue(SSDPMessage message) { 097 if (message != null && (! queue.contains(message))) { 098 queue.add(message); 099 } 100 } 101 102 /** 103 * Callback periodically made to {@link #queue(SSDPMessage)} a 104 * {@link SSDPDiscoveryRequest} available to be intercepted (overridden) 105 * by subclass implementations. 106 */ 107 protected void ping() { queue(request); } 108 109 @Override 110 public void start() { 111 new Thread() { 112 { setDaemon(true); } 113 114 @Override 115 public void run() { 116 for (;;) { 117 ping(); 118 119 try { 120 sleep(interval * 1000); 121 } catch (InterruptedException exception) { 122 } 123 } 124 } 125 }.start(); 126 127 super.start(); 128 } 129 130 @Override 131 public void run() { 132 try { 133 byte[] bytes = new byte[8 * 1024]; 134 DatagramPacket packet = new DatagramPacket(bytes, bytes.length); 135 136 for (;;) { 137 SSDPMessage message = null; 138 139 while ((message = queue.poll()) != null) { 140 for (Listener listener : list) { 141 listener.sendEvent(this, message); 142 } 143 144 socket.send(message.toDatagramPacket()); 145 } 146 147 try { 148 packet.setData(bytes); 149 socket.receive(packet); 150 151 message = parse(packet); 152 153 if (message != null) { 154 for (Listener listener : list) { 155 listener.receiveEvent(this, message); 156 } 157 } 158 } catch (SocketTimeoutException exception) { 159 } 160 } 161 } catch (IOException exception) { 162 } finally { 163 if (socket != null) { 164 socket.close(); 165 } 166 } 167 } 168 169 private SSDPMessage parse(DatagramPacket packet) { 170 SSDPMessage message = null; 171 172 if (message == null) { 173 try { 174 message = new SSDPResponse(packet); 175 } catch (ParseException exception) { 176 } 177 } 178 179 if (message == null) { 180 try { 181 message = new SSDPRequest(packet); 182 } catch (ParseException exception) { 183 } 184 } 185 186 return message; 187 } 188 189 /** 190 * {@link SSDPDiscoveryThread} listener interface definition. 191 */ 192 public interface Listener { 193 194 /** 195 * Callback made just before sending a {@link SSDPMessage}. 196 * 197 * @param thread The {@link SSDPDiscoveryThread}. 198 * @param message The {@link SSDPMessage}. 199 */ 200 public void sendEvent(SSDPDiscoveryThread thread, SSDPMessage message); 201 202 /** 203 * Callback made after receiving a {@link SSDPMessage}. 204 * 205 * @param thread The {@link SSDPDiscoveryThread}. 206 * @param message The {@link SSDPMessage}. 207 */ 208 public void receiveEvent(SSDPDiscoveryThread thread, 209 SSDPMessage message); 210 } 211}